Infinispanのようなインメモリ・データグリッドで、Cache#sizeやkeySetみたいなCache内のエントリ全体に波及しそうな操作は、基本的にやらない方がいいというイメージです。
Infinispanというか、JBoss Data Gridのドキュメントにも以下のように書かれていますし。
1.8.3. Map メソッドの制限
size()、 values()、 keySet()、 entrySet() など特定の Map メソッドは不安定であるため、JBoss Data Grid で一定の制限を用いて使用することが可能です。これらのメソッドはロック (グローバルまたはローカル) を取得せず、同時編集、追加および削除はこれらの呼び出しでは考慮されません。さらに、前述のメソッドはローカルのデータコンテナ上でのみ操作可能で、ステートのグローバルビューは提供しません。
https://access.redhat.com/site/documentation/ja-JP/Red_Hat_JBoss_Data_Grid/6/html-single/Administration_and_Configuration_Guide/index.html#Limitations_of_Map_Methods
前述のメソッドがグローバルに実行されると、パフォーマンスに大きく影響し、スケーラビリティーのボトルネックが発生します。そのため、情報収集やデバッグの目的でのみこれらのメソッドを使用することが推奨されます。
InfinispanのCacheインターフェースのJavadocにも、だいたい同じようなことが書かれています。
それになにより、自分自身が実際にCacheに存在するエントリ数とsizeやkeySetの内容が一致しないケースがあることを、これまで見ていますので。
Infinispanを使った確認のコードサンプルを書く時に、あらかじめキーの一覧を保持するようなコードを書いているのは、これが理由です。
で、ここらでひとつ実際の挙動をちゃんと確認しようと思いまして。
準備
まずは、ビルドの準備。
build.sbt
name := "infinispan-cache-keys" version := "0.0.1-SNAPSHOT" scalaVersion := "2.10.3" organization := "littlewings" resolvers += "Public JBoss Group" at "http://repository.jboss.org/nexus/content/groups/public-jboss" libraryDependencies ++= Seq( "org.infinispan" % "infinispan-core" % "6.0.0.Final" excludeAll( ExclusionRule(organization = "org.jgroups", name = "jgroups"), ExclusionRule(organization = "org.jboss.marshalling", name = "jboss-marshalling-river"), ExclusionRule(organization = "org.jboss.marshalling", name = "jboss-marshalling"), ExclusionRule(organization = "org.jboss.logging", name = "jboss-logging"), ExclusionRule(organization = "org.jboss.spec.javax.transaction", name = "jboss-transaction-api_1.1_spec") ), "org.jgroups" % "jgroups" % "3.4.1.Final", "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.1_spec" % "1.0.1.Final", "org.jboss.marshalling" % "jboss-marshalling-river" % "1.3.18.GA", "org.jboss.marshalling" % "jboss-marshalling" % "1.3.18.GA", "org.jboss.logging" % "jboss-logging" % "3.1.2.GA", "net.jcip" % "jcip-annotations" % "1.0" )
設定ファイルも用意。
src/main/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd" xmlns="urn:infinispan:config:6.0"> <global> <transport clusterName="cache-keys-cluster"> <properties> <property name="configurationFile" value="jgroups.xml" /> </properties> </transport> <globalJmxStatistics enabled="true" jmxDomain="org.infinispan" cacheManagerName="DefaultCacheManager" /> <shutdown hookBehavior="REGISTER"/> </global> <default> <clustering mode="distribution" /> </default> </infinispan>
簡単な、Distibution Cacheを使ったクラスタを組みます。JGroupsの設定は、例のごとく端折ります。
で、浮いていてもらうサーバをちょっと用意。
src/main/scala/EmbeddedCacheServer.scala
import org.infinispan.manager.DefaultCacheManager object EmbeddedCacheServer { def main(args: Array[String]): Unit = { val manager = new DefaultCacheManager("infinispan.xml") try { val cache = manager.getCache[String, String]() readLine("Server Startup.") } finally { manager.stop() } } }
Enterを押すと、終了しちゃいますね。
そしてもうひとつ。
src/main/scala/InfinispanCacheKeys.scala
import scala.collection.JavaConverters._ import org.infinispan.manager.DefaultCacheManager object InfinispanCacheKeys { def main(args: Array[String]): Unit = { val manager = new DefaultCacheManager("infinispan.xml") try { val cache = manager.getCache[String, String]() (1 to 10) foreach { i => cache.put(s"key$i", s"value$i") } println(s"Size => ${cache.size}") cache .keySet .asScala .toSeq .sortWith(_.drop(3).toInt < _.drop(3).toInt) .foreach(println) } finally { manager.stop() } } }
Cacheにデータを登録して、現在のsizeとkeySetを確認するクラスです。
確認してみる
では、まず浮いててもらうサーバを、ひとつ起動します。
$ sbt "runMain EmbeddedCacheServer"
サーバが起動したのを見たら、先ほどのプログラムを実行します。
> runMain InfinispanCacheKeys
クラスタに組み込まれた後、こんな結果が出力されます。
Size => 10 key1 key2 key3 key4 key5 key6 key7 key8 key9 key10
普通に全部見えていますね。
ここでこの結果を出力したプログラムは終了してしまいますが、追加でもうひとつサーバを起動します。
$ sbt "runMain EmbeddedCacheServer"
これで、クラスタに2つのNodeが現在存在します。
先ほどのプログラムを、もう1度起動します。
> runMain InfinispanCacheKeys
結果。
Size => 7 key1 key4 key5 key6 key7 key8 key9
明らかに数が減りました。さて、いったいどれが見えなくなっているのでしょう?
見えなくなったキーを確認する
先ほどのsizeとキーを出力していたプログラムに対して、キーの一覧を出力後に以下のようなコードを追加します。
val dm = cache.getAdvancedCache.getDistributionManager val selfNode = manager.getAddress println(s"Self Node = $selfNode") (1 to 10) foreach { i => val key = s"key$i" val primary = dm.getPrimaryLocation(key) val locate = dm.locate(key) val notSelf = (primary != selfNode) && !locate.contains(selfNode) val prefix = if (notSelf) "***** " else "" println(s"${prefix}Key = $key, PL[$primary], Locate$locate") }
バックアップを含んだエントリの配置先を取得して、バックアップを含めて自分が持っていないエントリについては、Prefixとして「*****」と出力します。
では、再度実行。
> runMain InfinispanCacheKeys
結果。
Size => 7 key1 key4 key5 key6 key7 key8 key9
sizeは7で、2、3、10のキーが見えていません。
ここで、自分自身のNodeの名前はこんな感じです。
Self Node = xxxxx-38911
この後に続く出力は、こうなります。
Key = key1, PL[xxxxx-162], Locate[xxxxx-162, xxxxx-38911] ***** Key = key2, PL[xxxxx-162], Locate[xxxxx-162, xxxxx-63722] ***** Key = key3, PL[xxxxx-162], Locate[xxxxx-162, xxxxx-63722] Key = key4, PL[xxxxx-38911], Locate[xxxxx-38911, xxxxx-63722] Key = key5, PL[xxxxx-63722], Locate[xxxxx-63722, xxxxx-38911] Key = key6, PL[xxxxx-162], Locate[xxxxx-162, xxxxx-38911] Key = key7, PL[xxxxx-38911], Locate[xxxxx-38911, xxxxx-162] Key = key8, PL[xxxxx-162], Locate[xxxxx-162, xxxxx-38911] Key = key9, PL[xxxxx-63722], Locate[xxxxx-63722, xxxxx-38911] ***** Key = key10, PL[xxxxx-63722], Locate[xxxxx-63722, xxxxx-162]
2、3、10に「*****」が付きましたね。つまり、自分自身がプライマリでもバックアップでもないエントリについては、sizeやkeySetなどのメソッドでは見えないということですね。
InfinispanのMap Reduce Framework内部では、普通にkeySetを使っているので、どうなんだろうと思っていましたが、全クラスタのエントリを舐めているわけではないことの確信が持てました。
最大数で、プライマリ/バックアップ数分ですね。
*Map Reduce Frameworkでは、その後プライマリのキーだけを選んで処理を行います
ちょっと、納得できました。そして、結果が安定しないことも理解できました。