InfinispanのMap Reduceについて見ていて、「どうやって自ノードのキーだけを対象にしてるのかな?」と思ったことがきっかけで、その実装を調べてみました。
結論からいくと、DistributionManagerを使えばいいみたいです。
http://docs.jboss.org/infinispan/5.3/apidocs/org/infinispan/distribution/DistributionManager.html
DistributionManagerは、Cache#getAdvancedCache#getDistributionManagerで取得することができます。
val manager = new DefaultCacheManager("...") val cache = manager.getCache[String, String] val dm = cache.getAdvancedCache.getDistributionManager
自ノードのアドレスを確認するには、DistributionManager#getAddressを使用します。
manager.getAddress
ここで取得できるAddressは、JGroupsのログなどで確認できる
[info] ------------------------------------------------------------------- [info] GMS: address=ubuntu-26888, cluster=dmCluster, physical address=fe80:0:0:0:20c:29ff:fe5c:cfec%2:46265 [info] -------------------------------------------------------------------
のaddressの部分(この例では「ubuntu-26888」)が取得できます。
クラスタへの参加が完了したかどうかを確認するには、DistributionManager#isJoinCompleteを使用します。
dm.isJoinComplete
Rehash中なのかを確認するには、DistributionManager#isRehashInProgress。
dm.isRehashInProgress
ConsistentHashのインスタンスを取得することもできます。
dm.getConsistentHash
ここから先は、キー単位で呼び出すメソッドになります。
指定したキーに対するエントリがローカルにあるかそうでないかを確認するには、DistributionManager#getLocality(Object key)を使用します。
dm.getLocality(key)
戻り値はDataLocalityというEnumです。
http://docs.jboss.org/infinispan/5.3/apidocs/org/infinispan/distribution/DataLocality.html
インスタンスの種類としては、
- Local(ローカル)
- LOCAL_UNCERTAIN(ローカルだが未確定)
- NOT_LOCAL(別ノードが保持)
- NOT_LOCAL_UNCERTAIN(別ノードだが未確定)
というのがあります。
Uncertainというのは
Uncertainty indicates a rehash is in progress and the locality of key in question may be in flux.
http://docs.jboss.org/infinispan/5.3/apidocs/org/infinispan/distribution/DataLocality.html
というように、Rehash中のため未確定なことを示しているようです。
キーに対するプライマリノードを確認するには、DistributionManager#getPrimaryLocation(Object key)を使用します。
dm.getPrimaryLocation(key)
戻り値はAddressです。
Infinispan MapReduceでは、DistributionManager#getPrimaryLocation(Object key)とEmbeddedCacheManager#getAddressを使用することで、自ノードがプライマリであるキーのみを処理するようにしています。
// org.infinispan.distexec.mapreduce.MapReduceManagerImplより // @Inject // public void init(EmbeddedCacheManager cacheManager, CacheLoaderManager cacheLoaderManager, // @ComponentName(ASYNC_TRANSPORT_EXECUTOR) ExecutorService asyncTransportExecutor) { // より抜粋 this.localAddress = cacheManager.getAddress(); // protected <KIn> Set<KIn> filterLocalPrimaryOwner(Set<KIn> nodeLocalKeys, DistributionManager dm) { // より抜粋 Address primaryLocation = dm.getPrimaryLocation(key); if (primaryLocation != null && primaryLocation.equals(localAddress)) { // これがtrueになれば、自ノードがプライマリ
ただ、処理対象の全キーはそもそもどうやって取得してるんだろうと思ったら、Cache#keySetでした…。
キーに対するデータを持つAddressの一覧は、DistributionManager#locate(Object key)で取得することができます。
dm.locate(key)
戻り値はList<Address>です。
あと、DistributionManagerからは外れますが、現在のクラスタのメンバー一覧を取得するには、RpcManagerを使用するとよいみたいです。
val rpcManager = cache.getAdvancedCache.getRpcManager println("All Cluster Members => " + rpcManager.getMembers)
RpcManagerは、Cache#getAdvancedCache#getRpcManagerで取得できます。
ちなみに、DistributionManagerもRpcManagerもそうですが、中のコードを追いかけていくとStateTransferManagerにつながり、最後はCacheTopologyに行き着くことになります。
http://docs.jboss.org/infinispan/5.3/apidocs/org/infinispan/topology/CacheTopology.html
最後に、実際に使っていってみましょう。
まずは準備。
build.sbt
name := "infinispan-distribution-manager" version := "0.0.1-SNAPSHOT" scalaVersion := "2.10.1" organization := "littlewings" fork in run := true libraryDependencies += "org.infinispan" % "infinispan-core" % "5.3.0.CR1"
5.3.0もCR1が出ましたね〜。
src/main/resources/jgroups.xml
<?xml version="1.0" encoding="UTF-8"?> <config xmlns="urn:org:jgroups" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.2.xsd"> <UDP mcast_addr="${jgroups.udp.mcast_addr:228.11.11.11}" mcast_port="${jgroups.udp.mcast_port:45688}" tos="8" ucast_recv_buf_size="130000" ucast_send_buf_size="100000" mcast_recv_buf_size="130000" mcast_send_buf_size="100000" loopback="true" thread_naming_pattern="cl" thread_pool.enabled="true" thread_pool.min_threads="2" thread_pool.max_threads="8" thread_pool.keep_alive_time="5000" thread_pool.queue_enabled="true" thread_pool.queue_max_size="1000" thread_pool.rejection_policy="discard" oob_thread_pool.enabled="true" oob_thread_pool.min_threads="2" oob_thread_pool.max_threads="8" oob_thread_pool.keep_alive_time="1000" oob_thread_pool.queue_enabled="false" oob_thread_pool.rejection_policy="discard" /> <PING /> <FD_ALL /> <FD_SOCK /> <UNICAST2 /> <pbcast.NAKACK2 /> <pbcast.GMS print_local_addr="true" /> </config>
src/main/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:5.3 http://www.infinispan.org/schemas/infinispan-config-5.3.xsd" xmlns="urn:infinispan:config:5.3"> <global> <transport clusterName="dmCluster"> <properties> <property name="configurationFile" value="jgroups.xml" /> </properties> </transport> <globalJmxStatistics enabled="true" jmxDomain="org.infinispan" cacheManagerName="DefaultCacheManager" /> </global> <default> <jmxStatistics enabled="true"/> <clustering mode="distribution"> <hash numOwners="2" /> <sync /> </clustering> </default> </infinispan>
クラスタリングのモードは分散で、データは2つコピーを持つようにしています。
今回は、クラスタを4つで構成しようと思います。
ただ浮いていてもらうだけのCacheServerを定義します。
src/main/scala/EmbeddedCacheServer.scala
import org.infinispan.manager.DefaultCacheManager object EmbeddedCacheServer { def main(args: Array[String]): Unit = { val manager = new DefaultCacheManager("infinispan.xml") val cache = manager.getCache[AnyRef, AnyRef] } }
この何もしないインスタンスを、先に3つ起動しておきます。
### 1つ目 $ sbt "run-main EmbeddedCacheServer" [info] Set current project to infinispan-distribution-manager (in build file:/xxxxx/infinispan-distribution-manager/) [info] Running EmbeddedCacheServer [error] 6 01, 2013 5:10:06 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport start [error] INFO: ISPN000078: Starting JGroups Channel [info] [info] ------------------------------------------------------------------- [info] GMS: address=ubuntu-26888, cluster=dmCluster, physical address=fe80:0:0:0:20c:29ff:fe5c:cfec%2:46265 [info] ------------------------------------------------------------------- ### 2つ目 $ sbt "run-main EmbeddedCacheServer" [info] Set current project to infinispan-distribution-manager (in build file:/xxxxx/infinispan-distribution-manager/) [info] Running EmbeddedCacheServer [error] 6 01, 2013 5:10:09 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport start [error] INFO: ISPN000078: Starting JGroups Channel [info] [info] ------------------------------------------------------------------- [info] GMS: address=ubuntu-14786, cluster=dmCluster, physical address=fe80:0:0:0:20c:29ff:fe5c:cfec%2:44720 [info] ------------------------------------------------------------------- ### 3つ目 $ sbt "run-main EmbeddedCacheServer" [info] Set current project to infinispan-distribution-manager (in build file:/xxxxx/infinispan-distribution-manager/) [info] Running EmbeddedCacheServer [error] 6 01, 2013 5:10:13 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport start [error] INFO: ISPN000078: Starting JGroups Channel [info] [info] ------------------------------------------------------------------- [info] GMS: address=ubuntu-40825, cluster=dmCluster, physical address=fe80:0:0:0:20c:29ff:fe5c:cfec%2:52067 [info] -------------------------------------------------------------------
あとは、DistributionManagerを使う方です。
src/main/scala/InfinispanDistributionManagerTest.scala
import org.infinispan.manager.DefaultCacheManager object InfinispanDistributionManagerTest { def main(args: Array[String]): Unit = { val manager = new DefaultCacheManager("infinispan.xml") val cache = manager.getCache[String, String] val dm = cache.getAdvancedCache.getDistributionManager try { println("Local Address => " + manager.getAddress) println("Join Complete => " + dm.isJoinComplete) println("Rehash In Progress => " + dm.isRehashInProgress) println("Consistent Hash => " + dm.getConsistentHash) val (keys, values) = (1 to 100).map(i => (s"key$i", s"value$i")).unzip keys.zip(values).foreach { case (k, v) => cache.put(k, v) } keys.foreach { key => println("Key[" + key + "]:" + System.lineSeparator + "\t\tPrimary Location => " + dm.getPrimaryLocation(key) + "," + System.lineSeparator + "\t\tLocality => " + dm.getLocality(key) + "," + System.lineSeparator + "\t\tLocate => " + dm.locate(key)) } val rpcManager = cache.getAdvancedCache.getRpcManager println("All Cluster Members => " + rpcManager.getMembers) } finally { cache.stop() manager.stop() } } }
最初に
println("Local Address => " + manager.getAddress) println("Join Complete => " + dm.isJoinComplete) println("Rehash In Progress => " + dm.isRehashInProgress) println("Consistent Hash => " + dm.getConsistentHash)
とクラスタの情報を表示して、
val (keys, values) = (1 to 100).map(i => (s"key$i", s"value$i")).unzip keys.zip(values).foreach { case (k, v) => cache.put(k, v) }
キャッシュにエントリを100個登録します。その後、キーに対するプライマリノード、データのローカリティ、データの保持ノードのリストを表示して
keys.foreach { key => println("Key[" + key + "]:" + System.lineSeparator + "\t\tPrimary Location => " + dm.getPrimaryLocation(key) + "," + System.lineSeparator + "\t\tLocality => " + dm.getLocality(key) + "," + System.lineSeparator + "\t\tLocate => " + dm.locate(key)) }
val rpcManager = cache.getAdvancedCache.getRpcManager println("All Cluster Members => " + rpcManager.getMembers)
では、実行。
起動して
> run-main InfinispanDistributionManagerTest [info] Running InfinispanDistributionManagerTest [error] 6 01, 2013 5:44:10 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport start [error] INFO: ISPN000078: Starting JGroups Channel [info] [info] ------------------------------------------------------------------- [info] GMS: address=ubuntu-7474, cluster=dmCluster, physical address=fe80:0:0:0:20c:29ff:fe5c:cfec%2:38215 [info] -------------------------------------------------------------------
あ、ノード名は「ubuntu-7474」ですね。
[info] Local Address => ubuntu-7474 [info] Join Complete => true [info] Rehash In Progress => false [info] Consistent Hash => DefaultConsistentHash{numSegments=60, numOwners=2, members=[ubuntu-26888, ubuntu-14786, ubuntu-40825, ubuntu-7474]}
クラスタへの参加が完了したこと、Rehash中ではないことがわかります。
あとは、キーに対する情報が並びますが、100個並べても仕方がないので、いくつか抜粋します。
自ノードにデータがあり、かつプライマリノードであるもの。
[info] Key[key6]: [info] Primary Location => ubuntu-7474, [info] Locality => LOCAL, [info] Locate => [ubuntu-7474, ubuntu-26888]
自ノードにデータはあるが、プライマリではないもの
[info] Key[key9]: [info] Primary Location => ubuntu-14786, [info] Locality => LOCAL, [info] Locate => [ubuntu-14786, ubuntu-7474]
自ノードにデータがないもの。
[info] Key[key10]: [info] Primary Location => ubuntu-14786, [info] Locality => NOT_LOCAL, [info] Locate => [ubuntu-14786, ubuntu-40825]
[info] All Cluster Members => [ubuntu-26888, ubuntu-14786, ubuntu-40825, ubuntu-7474]