CLOVER🍀

That was when it all began.

InfinispanのDistributionManagerを使って、キャッシュエントリの保持情報を確認する

InfinispanのMap Reduceについて見ていて、「どうやって自ノードのキーだけを対象にしてるのかな?」と思ったことがきっかけで、その実装を調べてみました。

結論からいくと、DistributionManagerを使えばいいみたいです。
http://docs.jboss.org/infinispan/5.3/apidocs/org/infinispan/distribution/DistributionManager.html

DistributionManagerは、Cache#getAdvancedCache#getDistributionManagerで取得することができます。

    val manager = new DefaultCacheManager("...")
    val cache = manager.getCache[String, String]

    val dm = cache.getAdvancedCache.getDistributionManager

自ノードのアドレスを確認するには、DistributionManager#getAddressを使用します。

manager.getAddress

ここで取得できるAddressは、JGroupsのログなどで確認できる

[info] -------------------------------------------------------------------
[info] GMS: address=ubuntu-26888, cluster=dmCluster, physical address=fe80:0:0:0:20c:29ff:fe5c:cfec%2:46265
[info] -------------------------------------------------------------------

のaddressの部分(この例では「ubuntu-26888」)が取得できます。

クラスタへの参加が完了したかどうかを確認するには、DistributionManager#isJoinCompleteを使用します。

dm.isJoinComplete

Rehash中なのかを確認するには、DistributionManager#isRehashInProgress。

dm.isRehashInProgress

ConsistentHashのインスタンスを取得することもできます。

dm.getConsistentHash

ここから先は、キー単位で呼び出すメソッドになります。

指定したキーに対するエントリがローカルにあるかそうでないかを確認するには、DistributionManager#getLocality(Object key)を使用します。

dm.getLocality(key)

戻り値はDataLocalityというEnumです。
http://docs.jboss.org/infinispan/5.3/apidocs/org/infinispan/distribution/DataLocality.html

インスタンスの種類としては、

  • Local(ローカル)
  • LOCAL_UNCERTAIN(ローカルだが未確定)
  • NOT_LOCAL(別ノードが保持)
  • NOT_LOCAL_UNCERTAIN(別ノードだが未確定)

というのがあります。

Uncertainというのは

Uncertainty indicates a rehash is in progress and the locality of key in question may be in flux.

http://docs.jboss.org/infinispan/5.3/apidocs/org/infinispan/distribution/DataLocality.html

というように、Rehash中のため未確定なことを示しているようです。

キーに対するプライマリノードを確認するには、DistributionManager#getPrimaryLocation(Object key)を使用します。

dm.getPrimaryLocation(key)

戻り値はAddressです。

Infinispan MapReduceでは、DistributionManager#getPrimaryLocation(Object key)とEmbeddedCacheManager#getAddressを使用することで、自ノードがプライマリであるキーのみを処理するようにしています。

// org.infinispan.distexec.mapreduce.MapReduceManagerImplより

//   @Inject
//   public void init(EmbeddedCacheManager cacheManager, CacheLoaderManager cacheLoaderManager,
//            @ComponentName(ASYNC_TRANSPORT_EXECUTOR) ExecutorService asyncTransportExecutor) {
// より抜粋
      this.localAddress = cacheManager.getAddress();

// protected <KIn> Set<KIn> filterLocalPrimaryOwner(Set<KIn> nodeLocalKeys, DistributionManager dm) {
// より抜粋
         Address primaryLocation = dm.getPrimaryLocation(key);
         if (primaryLocation != null && primaryLocation.equals(localAddress)) {
             // これがtrueになれば、自ノードがプライマリ

ただ、処理対象の全キーはそもそもどうやって取得してるんだろうと思ったら、Cache#keySetでした…。

キーに対するデータを持つAddressの一覧は、DistributionManager#locate(Object key)で取得することができます。

dm.locate(key)

戻り値はList<Address>です。

あと、DistributionManagerからは外れますが、現在のクラスタのメンバー一覧を取得するには、RpcManagerを使用するとよいみたいです。

val rpcManager = cache.getAdvancedCache.getRpcManager
println("All Cluster Members => " + rpcManager.getMembers)

RpcManagerは、Cache#getAdvancedCache#getRpcManagerで取得できます。

ちなみに、DistributionManagerもRpcManagerもそうですが、中のコードを追いかけていくとStateTransferManagerにつながり、最後はCacheTopologyに行き着くことになります。
http://docs.jboss.org/infinispan/5.3/apidocs/org/infinispan/topology/CacheTopology.html


最後に、実際に使っていってみましょう。

まずは準備。
build.sbt

name := "infinispan-distribution-manager"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.10.1"

organization := "littlewings"

fork in run := true

libraryDependencies += "org.infinispan" % "infinispan-core" % "5.3.0.CR1"

5.3.0もCR1が出ましたね〜。

src/main/resources/jgroups.xml

<?xml version="1.0" encoding="UTF-8"?>
<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.2.xsd">
  <UDP
      mcast_addr="${jgroups.udp.mcast_addr:228.11.11.11}"
      mcast_port="${jgroups.udp.mcast_port:45688}"
      tos="8"
      ucast_recv_buf_size="130000"
      ucast_send_buf_size="100000"
      mcast_recv_buf_size="130000"
      mcast_send_buf_size="100000"
      loopback="true"

      thread_naming_pattern="cl"

      thread_pool.enabled="true"
      thread_pool.min_threads="2"
      thread_pool.max_threads="8"
      thread_pool.keep_alive_time="5000"
      thread_pool.queue_enabled="true"
      thread_pool.queue_max_size="1000"
      thread_pool.rejection_policy="discard"

      oob_thread_pool.enabled="true"
      oob_thread_pool.min_threads="2"
      oob_thread_pool.max_threads="8"
      oob_thread_pool.keep_alive_time="1000"
      oob_thread_pool.queue_enabled="false"
      oob_thread_pool.rejection_policy="discard"
      />

  <PING />
  <FD_ALL />
  <FD_SOCK />
  <UNICAST2 />
  <pbcast.NAKACK2 />
  <pbcast.GMS print_local_addr="true" />
</config>

src/main/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:5.3 http://www.infinispan.org/schemas/infinispan-config-5.3.xsd"
    xmlns="urn:infinispan:config:5.3">

  <global>
    <transport clusterName="dmCluster">
      <properties>
        <property name="configurationFile" value="jgroups.xml" />
      </properties>
    </transport>
    <globalJmxStatistics
        enabled="true"
        jmxDomain="org.infinispan"
        cacheManagerName="DefaultCacheManager"
        />
  </global>

  <default>
    <jmxStatistics enabled="true"/>
    <clustering mode="distribution">
      <hash numOwners="2" />
      <sync />
    </clustering>
  </default>
</infinispan>

クラスタリングのモードは分散で、データは2つコピーを持つようにしています。

今回は、クラスタを4つで構成しようと思います。

ただ浮いていてもらうだけのCacheServerを定義します。
src/main/scala/EmbeddedCacheServer.scala

import org.infinispan.manager.DefaultCacheManager

object EmbeddedCacheServer {
  def main(args: Array[String]): Unit = {
    val manager = new DefaultCacheManager("infinispan.xml")
    val cache = manager.getCache[AnyRef, AnyRef]
  }
}

この何もしないインスタンスを、先に3つ起動しておきます。

### 1つ目
$ sbt "run-main EmbeddedCacheServer"
[info] Set current project to infinispan-distribution-manager (in build file:/xxxxx/infinispan-distribution-manager/)
[info] Running EmbeddedCacheServer 
[error] 6 01, 2013 5:10:06 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport start
[error] INFO: ISPN000078: Starting JGroups Channel
[info] 
[info] -------------------------------------------------------------------
[info] GMS: address=ubuntu-26888, cluster=dmCluster, physical address=fe80:0:0:0:20c:29ff:fe5c:cfec%2:46265
[info] -------------------------------------------------------------------

### 2つ目
$ sbt "run-main EmbeddedCacheServer"
[info] Set current project to infinispan-distribution-manager (in build file:/xxxxx/infinispan-distribution-manager/)
[info] Running EmbeddedCacheServer 
[error] 6 01, 2013 5:10:09 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport start
[error] INFO: ISPN000078: Starting JGroups Channel
[info] 
[info] -------------------------------------------------------------------
[info] GMS: address=ubuntu-14786, cluster=dmCluster, physical address=fe80:0:0:0:20c:29ff:fe5c:cfec%2:44720
[info] -------------------------------------------------------------------

### 3つ目
$ sbt "run-main EmbeddedCacheServer"
[info] Set current project to infinispan-distribution-manager (in build file:/xxxxx/infinispan-distribution-manager/)
[info] Running EmbeddedCacheServer 
[error] 6 01, 2013 5:10:13 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport start
[error] INFO: ISPN000078: Starting JGroups Channel
[info] 
[info] -------------------------------------------------------------------
[info] GMS: address=ubuntu-40825, cluster=dmCluster, physical address=fe80:0:0:0:20c:29ff:fe5c:cfec%2:52067
[info] -------------------------------------------------------------------

あとは、DistributionManagerを使う方です。
src/main/scala/InfinispanDistributionManagerTest.scala

import org.infinispan.manager.DefaultCacheManager

object InfinispanDistributionManagerTest {
  def main(args: Array[String]): Unit = {
    val manager = new DefaultCacheManager("infinispan.xml")
    val cache = manager.getCache[String, String]

    val dm = cache.getAdvancedCache.getDistributionManager

    try {
      println("Local Address => " + manager.getAddress)

      println("Join Complete => " + dm.isJoinComplete)

      println("Rehash In Progress => " + dm.isRehashInProgress)

      println("Consistent Hash => " + dm.getConsistentHash)

      val (keys, values) =
        (1 to 100).map(i => (s"key$i", s"value$i")).unzip

      keys.zip(values).foreach { case (k, v) => cache.put(k, v) }

      keys.foreach { key =>
        println("Key[" + key + "]:" + System.lineSeparator +
                "\t\tPrimary Location => " + dm.getPrimaryLocation(key) + "," + System.lineSeparator +
                "\t\tLocality => " + dm.getLocality(key) + "," + System.lineSeparator +
                "\t\tLocate => " + dm.locate(key))
      }

      val rpcManager = cache.getAdvancedCache.getRpcManager
      println("All Cluster Members => " + rpcManager.getMembers)
    } finally {
      cache.stop()
      manager.stop()
    }
  }
}

最初に

      println("Local Address => " + manager.getAddress)

      println("Join Complete => " + dm.isJoinComplete)

      println("Rehash In Progress => " + dm.isRehashInProgress)

      println("Consistent Hash => " + dm.getConsistentHash)

クラスタの情報を表示して、

      val (keys, values) =
        (1 to 100).map(i => (s"key$i", s"value$i")).unzip

      keys.zip(values).foreach { case (k, v) => cache.put(k, v) }

キャッシュにエントリを100個登録します。その後、キーに対するプライマリノード、データのローカリティ、データの保持ノードのリストを表示して

      keys.foreach { key =>
        println("Key[" + key + "]:" + System.lineSeparator +
                "\t\tPrimary Location => " + dm.getPrimaryLocation(key) + "," + System.lineSeparator +
                "\t\tLocality => " + dm.getLocality(key) + "," + System.lineSeparator +
                "\t\tLocate => " + dm.locate(key))
      }

最後にクラスタのメンバー一覧を出力して終了です。

      val rpcManager = cache.getAdvancedCache.getRpcManager
      println("All Cluster Members => " + rpcManager.getMembers)

では、実行。

起動して

> run-main InfinispanDistributionManagerTest
[info] Running InfinispanDistributionManagerTest 
[error] 6 01, 2013 5:44:10 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport start
[error] INFO: ISPN000078: Starting JGroups Channel
[info] 
[info] -------------------------------------------------------------------
[info] GMS: address=ubuntu-7474, cluster=dmCluster, physical address=fe80:0:0:0:20c:29ff:fe5c:cfec%2:38215
[info] -------------------------------------------------------------------

あ、ノード名は「ubuntu-7474」ですね。

[info] Local Address => ubuntu-7474
[info] Join Complete => true
[info] Rehash In Progress => false
[info] Consistent Hash => DefaultConsistentHash{numSegments=60, numOwners=2, members=[ubuntu-26888, ubuntu-14786, ubuntu-40825, ubuntu-7474]}

クラスタへの参加が完了したこと、Rehash中ではないことがわかります。

あとは、キーに対する情報が並びますが、100個並べても仕方がないので、いくつか抜粋します。

自ノードにデータがあり、かつプライマリノードであるもの。

[info] Key[key6]:
[info] 		Primary Location => ubuntu-7474,
[info] 		Locality => LOCAL,
[info] 		Locate => [ubuntu-7474, ubuntu-26888]

自ノードにデータはあるが、プライマリではないもの

[info] Key[key9]:
[info] 		Primary Location => ubuntu-14786,
[info] 		Locality => LOCAL,
[info] 		Locate => [ubuntu-14786, ubuntu-7474]

自ノードにデータがないもの。

[info] Key[key10]:
[info] 		Primary Location => ubuntu-14786,
[info] 		Locality => NOT_LOCAL,
[info] 		Locate => [ubuntu-14786, ubuntu-40825]

最後、クラスタのメンバー一覧です。

[info] All Cluster Members => [ubuntu-26888, ubuntu-14786, ubuntu-40825, ubuntu-7474]