CLOVER🍀

That was when it all began.

Infinispan停止時にCacheのエントリがどうなるのか、改めて確認する

Infinispanのような、Node Discoveryの仕組みがあり、サーバを起動するとクラスタのメンバーに自動的に組み込まれるようなタイプのものについて、サーバ停止時はどうするのかなぁ?という点について。

いや、別に面白いオチはなかったんですが…。

確認のため、以下のような設定ファイルをプログラムを用意します。
src/main/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:5.3 http://www.infinispan.org/schemas/infinispan-config-5.3.xsd"
    xmlns="urn:infinispan:config:5.3">

  <global>
    <transport clusterName="sample-cluster">
      <properties>
        <property name="configurationFile" value="jgroups.xml" />
      </properties>
    </transport>
    <globalJmxStatistics
        enabled="true"
        jmxDomain="org.infinispan"
        cacheManagerName="DefaultCacheManager"
        />
  </global>

  <namedCache name="distCache">
    <jmxStatistics enabled="true" />
    <clustering mode="distribution">
      <hash numOwners="1" />
    </clustering>
  </namedCache>
</infinispan>

JGroupsの設定は、端折ります。また、クラスタ内でのコピー数は

      <hash numOwners="1" />

として、バックアップなしとしています。

プログラムの方は
src/main/scala/EmbeddedCacheServer.scala

import org.infinispan.Cache
import org.infinispan.manager.DefaultCacheManager

object EmbeddedCacheServer {
  val allKeys: Seq[Int] = 1 to 20

  def main(args: Array[String]): Unit = {
    val manager = new DefaultCacheManager("infinispan.xml")
    val cache = manager.getCache[String, String]("distCache")

    val keys = args.toList.headOption match {
      case Some("node1") => allKeys.take(10)
      case Some("node2") => allKeys.drop(10)
      case _ => 
        println("Please Input node1 or node2")
        sys.exit(1)
    }

    try {
      keys.foreach { i =>
        cache.put(s"key$i", s"value$i")
      }

      println("Initialized.")

      communicateWhile(cache, keys)
    } finally {
      manager.stop()
      println("CacheManager Stopped.")
    }
  }

  def communicateWhile(cache: Cache[String, String], keys: Seq[Int]): Unit =
    Iterator
      .continually(readLine())
      .withFilter(l => l != null && !l.isEmpty)
      .takeWhile(_ != "exit")
      .foreach { command => command.split("""\s+""").toList match {
          case "mylist" :: Nil =>
            keys.foreach { i =>
              val key = s"key$i"
              println(s"Key[$key] = ${cache.get(key)}")
            }
          case "list" :: Nil =>
            allKeys.foreach { i =>
              val key = s"key$i"
              println(s"Key[$key] = ${cache.get(key)}")
            }
          case "locate" :: Nil =>
            val dm = cache.getAdvancedCache.getDistributionManager
            allKeys.foreach { i =>
              val key = s"key$i"
              println(s"Key[$key]: PrimaryLocation = ${dm.getPrimaryLocation(key)}, Locate = ${dm.locate(key)}")
            }
          case "stop" :: Nil =>
            cache.stop()
            //cache.getCacheManager.removeCache(cache.getName)
            println("Cache Instance Stopped.")
          case _ => println(s"Unkwon Command[$command]")
        }
      }
}

みたいな感じで、とりあえず2つのサーバを前提に「node1」「node2」のどちらかを指定してNodeの決定と管理するキーの範囲を決め、あとはコマンドでキー全部と値、自分が持つキーと値、キーの配置場所、キャッシュの停止、プログラムの終了ができるようなコマンドを用意しました。

では、サーバを2つ起動してみます。

# ひとつめ
$ sbt "run node1"

# ふたつめ
$ sbt "run node2"

Node1なら、こんな感じで現在の状況が確認できます。

mylist
Key[key1] = value1
Key[key2] = value2
Key[key3] = value3
Key[key4] = value4
Key[key5] = value5
Key[key6] = value6
Key[key7] = value7
Key[key8] = value8
Key[key9] = value9
Key[key10] = value10


list
Key[key1] = value1
Key[key2] = value2
Key[key3] = value3
Key[key4] = value4
Key[key5] = value5
Key[key6] = value6
Key[key7] = value7
Key[key8] = value8
Key[key9] = value9
Key[key10] = value10
Key[key11] = value11
Key[key12] = value12
Key[key13] = value13
Key[key14] = value14
Key[key15] = value15
Key[key16] = value16
Key[key17] = value17
Key[key18] = value18
Key[key19] = value19
Key[key20] = value20

Node2なら、こんな感じ。

mylist
Key[key11] = value11
Key[key12] = value12
Key[key13] = value13
Key[key14] = value14
Key[key15] = value15
Key[key16] = value16
Key[key17] = value17
Key[key18] = value18
Key[key19] = value19
Key[key20] = value20


list
Key[key1] = value1
Key[key2] = value2
Key[key3] = value3
Key[key4] = value4
Key[key5] = value5
Key[key6] = value6
Key[key7] = value7
Key[key8] = value8
Key[key9] = value9
Key[key10] = value10
Key[key11] = value11
Key[key12] = value12
Key[key13] = value13
Key[key14] = value14
Key[key15] = value15
Key[key16] = value16
Key[key17] = value17
Key[key18] = value18
Key[key19] = value19
Key[key20] = value20

では、Node2を停止してみます。

stop
Cache Instance Stopped.

で、Node1で現状を見ると…

mylist
Key[key1] = value1
Key[key2] = value2
Key[key3] = value3
Key[key4] = value4
Key[key5] = null
Key[key6] = value6
Key[key7] = value7
Key[key8] = value8
Key[key9] = null
Key[key10] = null


list
Key[key1] = value1
Key[key2] = value2
Key[key3] = value3
Key[key4] = value4
Key[key5] = null
Key[key6] = value6
Key[key7] = value7
Key[key8] = value8
Key[key9] = null
Key[key10] = null
Key[key11] = value11
Key[key12] = value12
Key[key13] = null
Key[key14] = null
Key[key15] = value15
Key[key16] = null
Key[key17] = null
Key[key18] = null
Key[key19] = value19
Key[key20] = value20

というわけで、停止したNode2のデータがなくなります。停止したNodeで、Cache#startとしてもエントリが復活するわけではありません。

ちなみに、もともとCache#stopしていた部分を

            //cache.stop()
            cache.getCacheManager.removeCache(cache.getName)

と変えると、

java.lang.IllegalStateException: Cache 'distCache' is in 'TERMINATED' state and so it does not accept new invocations. Either restart it or recreate the cache container.
	at org.infinispan.interceptors.InvocationContextInterceptor.handleAll(InvocationContextInterceptor.java:110)
	at org.infinispan.interceptors.InvocationContextInterceptor.handleDefault(InvocationContextInterceptor.java:92)
	at org.infinispan.commands.AbstractVisitor.visitGetKeyValueCommand(AbstractVisitor.java:96)
	at org.infinispan.commands.read.GetKeyValueCommand.acceptVisitor(GetKeyValueCommand.java:62)
	at org.infinispan.interceptors.InterceptorChain.invoke(InterceptorChain.java:343)
	at org.infinispan.CacheImpl.get(CacheImpl.java:391)
	at org.infinispan.CacheImpl.get(CacheImpl.java:383)
	at EmbeddedCacheServer$$anonfun$communicateWhile$4$$anonfun$apply$2.apply$mcVI$sp(EmbeddedCacheServer.scala:47)
	at EmbeddedCacheServer$$anonfun$communicateWhile$4$$anonfun$apply$2.apply(EmbeddedCacheServer.scala:45)
	at EmbeddedCacheServer$$anonfun$communicateWhile$4$$anonfun$apply$2.apply(EmbeddedCacheServer.scala:45)
	at scala.collection.immutable.Range.foreach(Range.scala:141)
	at EmbeddedCacheServer$$anonfun$communicateWhile$4.apply(EmbeddedCacheServer.scala:45)
	at EmbeddedCacheServer$$anonfun$communicateWhile$4.apply(EmbeddedCacheServer.scala:38)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at EmbeddedCacheServer$.communicateWhile(EmbeddedCacheServer.scala:38)
	at EmbeddedCacheServer$.main(EmbeddedCacheServer.scala:26)
	at EmbeddedCacheServer.main(EmbeddedCacheServer.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)

となり、こちらはクラスタからCacheの定義そのものが削除されることになります。

というわけで、停止が発生しうるケースで、データが消失して欲しくない時はnumOwnersを

      <hash numOwners="2" />

のように、2以上に増やしたりCache Loaderを設定して永続化したりするんでしょうね。

というわけで、あまり面白いオチはありませんでした。Cache#stop時に、Entryの移動とかあるのかなぁ?と思ってみたのですが、そういうわけでもなさそうです。

Dynamically Start and Stop Clustered Cache[Library Mode]
http://infinispan.org/docs/5.3.x/user_guide/user_guide.html#_library_mode