Infinispanのような、Node Discoveryの仕組みがあり、サーバを起動するとクラスタのメンバーに自動的に組み込まれるようなタイプのものについて、サーバ停止時はどうするのかなぁ?という点について。
いや、別に面白いオチはなかったんですが…。
確認のため、以下のような設定ファイルをプログラムを用意します。
src/main/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:5.3 http://www.infinispan.org/schemas/infinispan-config-5.3.xsd" xmlns="urn:infinispan:config:5.3"> <global> <transport clusterName="sample-cluster"> <properties> <property name="configurationFile" value="jgroups.xml" /> </properties> </transport> <globalJmxStatistics enabled="true" jmxDomain="org.infinispan" cacheManagerName="DefaultCacheManager" /> </global> <namedCache name="distCache"> <jmxStatistics enabled="true" /> <clustering mode="distribution"> <hash numOwners="1" /> </clustering> </namedCache> </infinispan>
JGroupsの設定は、端折ります。また、クラスタ内でのコピー数は
<hash numOwners="1" />
として、バックアップなしとしています。
プログラムの方は
src/main/scala/EmbeddedCacheServer.scala
import org.infinispan.Cache import org.infinispan.manager.DefaultCacheManager object EmbeddedCacheServer { val allKeys: Seq[Int] = 1 to 20 def main(args: Array[String]): Unit = { val manager = new DefaultCacheManager("infinispan.xml") val cache = manager.getCache[String, String]("distCache") val keys = args.toList.headOption match { case Some("node1") => allKeys.take(10) case Some("node2") => allKeys.drop(10) case _ => println("Please Input node1 or node2") sys.exit(1) } try { keys.foreach { i => cache.put(s"key$i", s"value$i") } println("Initialized.") communicateWhile(cache, keys) } finally { manager.stop() println("CacheManager Stopped.") } } def communicateWhile(cache: Cache[String, String], keys: Seq[Int]): Unit = Iterator .continually(readLine()) .withFilter(l => l != null && !l.isEmpty) .takeWhile(_ != "exit") .foreach { command => command.split("""\s+""").toList match { case "mylist" :: Nil => keys.foreach { i => val key = s"key$i" println(s"Key[$key] = ${cache.get(key)}") } case "list" :: Nil => allKeys.foreach { i => val key = s"key$i" println(s"Key[$key] = ${cache.get(key)}") } case "locate" :: Nil => val dm = cache.getAdvancedCache.getDistributionManager allKeys.foreach { i => val key = s"key$i" println(s"Key[$key]: PrimaryLocation = ${dm.getPrimaryLocation(key)}, Locate = ${dm.locate(key)}") } case "stop" :: Nil => cache.stop() //cache.getCacheManager.removeCache(cache.getName) println("Cache Instance Stopped.") case _ => println(s"Unkwon Command[$command]") } } }
みたいな感じで、とりあえず2つのサーバを前提に「node1」「node2」のどちらかを指定してNodeの決定と管理するキーの範囲を決め、あとはコマンドでキー全部と値、自分が持つキーと値、キーの配置場所、キャッシュの停止、プログラムの終了ができるようなコマンドを用意しました。
では、サーバを2つ起動してみます。
# ひとつめ $ sbt "run node1" # ふたつめ $ sbt "run node2"
Node1なら、こんな感じで現在の状況が確認できます。
mylist Key[key1] = value1 Key[key2] = value2 Key[key3] = value3 Key[key4] = value4 Key[key5] = value5 Key[key6] = value6 Key[key7] = value7 Key[key8] = value8 Key[key9] = value9 Key[key10] = value10 list Key[key1] = value1 Key[key2] = value2 Key[key3] = value3 Key[key4] = value4 Key[key5] = value5 Key[key6] = value6 Key[key7] = value7 Key[key8] = value8 Key[key9] = value9 Key[key10] = value10 Key[key11] = value11 Key[key12] = value12 Key[key13] = value13 Key[key14] = value14 Key[key15] = value15 Key[key16] = value16 Key[key17] = value17 Key[key18] = value18 Key[key19] = value19 Key[key20] = value20
Node2なら、こんな感じ。
mylist Key[key11] = value11 Key[key12] = value12 Key[key13] = value13 Key[key14] = value14 Key[key15] = value15 Key[key16] = value16 Key[key17] = value17 Key[key18] = value18 Key[key19] = value19 Key[key20] = value20 list Key[key1] = value1 Key[key2] = value2 Key[key3] = value3 Key[key4] = value4 Key[key5] = value5 Key[key6] = value6 Key[key7] = value7 Key[key8] = value8 Key[key9] = value9 Key[key10] = value10 Key[key11] = value11 Key[key12] = value12 Key[key13] = value13 Key[key14] = value14 Key[key15] = value15 Key[key16] = value16 Key[key17] = value17 Key[key18] = value18 Key[key19] = value19 Key[key20] = value20
では、Node2を停止してみます。
stop
Cache Instance Stopped.
で、Node1で現状を見ると…
mylist Key[key1] = value1 Key[key2] = value2 Key[key3] = value3 Key[key4] = value4 Key[key5] = null Key[key6] = value6 Key[key7] = value7 Key[key8] = value8 Key[key9] = null Key[key10] = null list Key[key1] = value1 Key[key2] = value2 Key[key3] = value3 Key[key4] = value4 Key[key5] = null Key[key6] = value6 Key[key7] = value7 Key[key8] = value8 Key[key9] = null Key[key10] = null Key[key11] = value11 Key[key12] = value12 Key[key13] = null Key[key14] = null Key[key15] = value15 Key[key16] = null Key[key17] = null Key[key18] = null Key[key19] = value19 Key[key20] = value20
というわけで、停止したNode2のデータがなくなります。停止したNodeで、Cache#startとしてもエントリが復活するわけではありません。
ちなみに、もともとCache#stopしていた部分を
//cache.stop()
cache.getCacheManager.removeCache(cache.getName)
と変えると、
java.lang.IllegalStateException: Cache 'distCache' is in 'TERMINATED' state and so it does not accept new invocations. Either restart it or recreate the cache container. at org.infinispan.interceptors.InvocationContextInterceptor.handleAll(InvocationContextInterceptor.java:110) at org.infinispan.interceptors.InvocationContextInterceptor.handleDefault(InvocationContextInterceptor.java:92) at org.infinispan.commands.AbstractVisitor.visitGetKeyValueCommand(AbstractVisitor.java:96) at org.infinispan.commands.read.GetKeyValueCommand.acceptVisitor(GetKeyValueCommand.java:62) at org.infinispan.interceptors.InterceptorChain.invoke(InterceptorChain.java:343) at org.infinispan.CacheImpl.get(CacheImpl.java:391) at org.infinispan.CacheImpl.get(CacheImpl.java:383) at EmbeddedCacheServer$$anonfun$communicateWhile$4$$anonfun$apply$2.apply$mcVI$sp(EmbeddedCacheServer.scala:47) at EmbeddedCacheServer$$anonfun$communicateWhile$4$$anonfun$apply$2.apply(EmbeddedCacheServer.scala:45) at EmbeddedCacheServer$$anonfun$communicateWhile$4$$anonfun$apply$2.apply(EmbeddedCacheServer.scala:45) at scala.collection.immutable.Range.foreach(Range.scala:141) at EmbeddedCacheServer$$anonfun$communicateWhile$4.apply(EmbeddedCacheServer.scala:45) at EmbeddedCacheServer$$anonfun$communicateWhile$4.apply(EmbeddedCacheServer.scala:38) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at EmbeddedCacheServer$.communicateWhile(EmbeddedCacheServer.scala:38) at EmbeddedCacheServer$.main(EmbeddedCacheServer.scala:26) at EmbeddedCacheServer.main(EmbeddedCacheServer.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606)
となり、こちらはクラスタからCacheの定義そのものが削除されることになります。
というわけで、停止が発生しうるケースで、データが消失して欲しくない時はnumOwnersを
<hash numOwners="2" />
のように、2以上に増やしたりCache Loaderを設定して永続化したりするんでしょうね。
というわけで、あまり面白いオチはありませんでした。Cache#stop時に、Entryの移動とかあるのかなぁ?と思ってみたのですが、そういうわけでもなさそうです。
Dynamically Start and Stop Clustered Cache[Library Mode]
http://infinispan.org/docs/5.3.x/user_guide/user_guide.html#_library_mode