これまた、気になっていたAPIを。
Key affinity service
https://docs.jboss.org/author/display/ISPN/Key+affinity+service
分散したInfinispanクラスタで、特定のNodeに値が配置されるようにしたい場合に使うクラスのようです。Nodeを識別するための、クラスタ内でのアドレスを使用して、特定のノードにハッシュされるキーを生成しますよ、と。
要は、Primary Locationを決めてデータを配置するためのAPIみたですね。
準備
このAPIを使うためには、少なくともInfinispanがクラスタリングモードで設定されている必要があります(たぶん、分散モードでないとダメかも…)。
build.sbt
name := "infinispan-key-affinity-service-example" version := "0.0.1-SNAPSHOT" scalaVersion := "2.10.2" organization := "littlewings" fork in run := true resolvers += "JBoss Public Maven Repository Group" at "http://repository.jboss.org/nexus/content/groups/public-jboss/" libraryDependencies += "org.infinispan" % "infinispan-tree" % "5.3.0.CR1"
使用するキャッシュの名前は、「keyAffinityCache」にしました。
src/main/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:5.3 http://www.infinispan.org/schemas/infinispan-config-5.3.xsd" xmlns="urn:infinispan:config:5.3"> <global> <transport clusterName="key-affinity-cluster"> <properties> <property name="configurationFile" value="jgroups.xml" /> </properties> </transport> <globalJmxStatistics enabled="true" jmxDomain="org.infinispan" cacheManagerName="DefaultCacheManager" /> </global> <namedCache name="keyAffinityCache"> <jmxStatistics enabled="true"/> <clustering mode="distribution"> <hash numOwners="2" /> <sync /> </clustering> </namedCache> </infinispan>
JGroupsの設定。
src/main/resources/jgroups.xml
<?xml version="1.0" encoding="UTF-8"?> <config xmlns="urn:org:jgroups" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.2.xsd"> <UDP mcast_addr="${jgroups.udp.mcast_addr:228.11.11.11}" mcast_port="${jgroups.udp.mcast_port:45688}" tos="8" ucast_recv_buf_size="130000" ucast_send_buf_size="100000" mcast_recv_buf_size="130000" mcast_send_buf_size="100000" loopback="true" thread_naming_pattern="cl" thread_pool.enabled="true" thread_pool.min_threads="2" thread_pool.max_threads="8" thread_pool.keep_alive_time="5000" thread_pool.queue_enabled="true" thread_pool.queue_max_size="1000" thread_pool.rejection_policy="discard" oob_thread_pool.enabled="true" oob_thread_pool.min_threads="2" oob_thread_pool.max_threads="8" oob_thread_pool.keep_alive_time="1000" oob_thread_pool.queue_enabled="false" oob_thread_pool.rejection_policy="discard" /> <PING /> <FD_ALL /> <FD_SOCK /> <UNICAST2 /> <pbcast.NAKACK2 /> <pbcast.GMS print_local_addr="true" /> </config>
基本的な使い方
だいたい、こんな感じです。
とりあえず、普通にCacheまで作成します。
val manager = new DefaultCacheManager("infinispan.xml") val cache = manager.getCache[AnyRef, String]("keyAffinityCache")
次に、JDKのExecutorを用意して、KeyAffinityServiceFactoryクラスのstaticメソッドからKeyAffinityServiceクラスのインスタンスを取得します。
val executor = Executors.newSingleThreadExecutor val keyAffinityService = KeyAffinityServiceFactory.newLocalKeyAffinityService(cache, new RndKeyGenerator, executor, 100)
KeyAffinityServiceFactory.newLocalKeyAffinityServicメソッドを使用した場合、第1引数はCache、第2引数はキー生成器、第3引数にExecutor、第4引数にバッファサイズです。
オーバーロードされた版もあるのですが、こちらはKeyAffinityServiceを開始するかどうかを決めることができます。
このメソッドの内部では、自分自身のアドレスを対象とするように設定します。この辺りは、また後で…。
で、キーを生成するためには、KeyAffinityService#getKeyForAddressメソッドを呼び出すわけですが、その時にはAddressが必要になります。自分自身のAddressを取得する場合は、EmbeddedCacheManager#getAddressで取得しましょう。
val address = manager.getAddress
あとは、KeyAffinityService#getKeyForAddressメソッドを呼び出すだけです。
val key = keyAffinityService.getKeyForAddress(address)
ここで渡すAddressに対して、値を配置できるキーを生成するわけです。
ちなみに、生成されるキーの型はLongです。
KeyAffinityServiceFactory.newLocalKeyAffinityServicメソッドの引数にRndKeyGeneratorクラスのインスタンスを渡していましたが、これはデフォルトで用意されているKeyGeneratorインターフェースの実装です。
KeyAffinityServiceFactoryとかKeyAffinityServiceはいかにもCacheの型パラメータでジェネリックな感じですが、そんなことありません。
そういうの使いたかったら、用途に応じて自分で用意するんでしょうね。
ちなみに、RndKeyGeneratorのキーの生成方法は、ただのjava.util.Randomです…。
最後、KeyAffinityServiceを生成するのに使用した、Executorは自分でシャットダウンする必要があります。
executor.shutdown()
このExecutorがあるため、キーは非同期で生成されます。そのバッファサイズが、生成時のパラメータというわけですね。
KeyAffinityServiceは、Lifecycleインターフェースのstart/stopメソッドを実装しているため、任意に起動停止することができます。まあ、EmbeddedCacheManagerをシャットダウンした場合は、一緒に止まってくれるようですが。
トポロジが変わった場合は、どうなるんだ?(微妙に英訳…)
(Topology changesより)
トポロジが変わった場合は、KeyAffinityServiceからのキーの所有者の変更があるかもしれません。KeyAffinityServiceは、トポロジに変更があった場合はそれを追跡し続け、陳腐化したキーを返却しないようにします。例えば、キーがある特定のNodeから、別のNodeにマップされてしまった場合など。
しかし、これはキーが使用される時に、そのNodeが変わっていないことを保証するものではありません。
例えば、
- スレッドT1がNode Aにマップされたキーk1を読み取る
- トポロジに変更があり、キーk1はNode Bにマップされる
- スレッドT1はキーk1と何かを、Cacheに加える。この時、キーk1はNode Bにマップされており、異なるNodeが読み取りリクエストを行う
この動作は理想的ではありませんが、クラスタの変更があっても既に使用されている全てのキーに対しては、アプリケーションのための振る舞いをサポートするべきでしょう。
*ちょっと意味がわかりません…
KeyAffinityServiceは、トポロジの変更が当てはまらないような、安定したクラスタに対して近いアクセスでの最適化を提供します。
…だそうで。なんか英訳微妙ですが、つまり、トポロジに変更があるような環境で使うな、と。
まーそうですよね、途中でクラスタメンバ変わったりすると、場合によってはエラーになっちゃうだろうし(後述のfilterを使ったりする場合は、特に)。
う〜ん…。
まあ、いいや。
その他
あと、APIをもうちょっと深堀り。KeyAffinityServiceFactoryクラスには、他にnewKeyAffinityServiceというファクトリメソッドがオーバーロードされて定義してあります。
このうち、
Collection<Address> filter
を引数に取らないものとそうでないもので、生成されるインスタンスの振る舞いが変わります。
ここで指定するフィルタは、KeyAffinityServiceで対象とするNodeのAddressをCollectionとして渡します。すると、KeyAffinityService#getKeyForAddressメソッドの引数に、ここで指定したAdressが渡せるようになります。要は、自分自身以外のNodeに特化したキーを生成できるということです。
ただ、filterに渡すAddressは、もちろんクラスタのメンバである必要がありますが。
例えば、こんな感じでKeyAffinityServiceを生成すると、クラスタの全メンバを対象にできるようになります。
val executor = Executors.newSingleThreadExecutor val addresses = cache.getAdvancedCache.getRpcManager.getMembers val keyAffinityService = KeyAffinityServiceFactory.newKeyAffinityService(cache, addresses, new RndKeyGenerator, executor, 100)
あとは、普通に使えばOKです。例えば、以下では0から5までで、各Nodeに割り振るキーを生成しています。
for (i <- 0 to 5) { val addr = addresses.get(i % addresses.size) val key = keyAffinityService.getKeyForAddress(addr) cache.put(key, "1") }
なお、filterで指定した以外のAddressを渡すと、「そんなメンバー、クラスタにいないよ」とIllegalStateExceptionが投げられます。KeyAffinityServiceFactory.newLocalKeyAffinityServiceメソッドでKeyAffinityServiceのインスタンスを生成した場合は、自Nodeのみを対象にしたKeyAffinityServiceを生成します。
filter指定なしのKeyAffinityServiceFactory.newKeyAffinityServiceメソッドを使用した場合は、Nodeのチェックは行わなくなるみたいです。
そして、KeyAffinityService#getCollocatedKey(key)というメソッド。こちらは、指定したキーから、そのキーのPrimary Locationを探し、そのAddressから再度getKeyForAddressを呼び出してキーを生成するメソッドです。
ちなみに、KeyAffinityServiceがどうやって特定のNodeに配置されるようなキーを生成できているかというと、先にキーを生成した時に振られるAddressを算出しているからみたいです。
KeyAffinityServiceの実装、KeyAffinityServiceImplより。
private Address getAddressForKey(Object key) { DistributionManager distributionManager = getDistributionManager(); ConsistentHash hash = distributionManager.getConsistentHash(); return hash.locatePrimaryOwner(key); }
説明する時はだいぶ端折っちゃいましたけど、最後に動作確認をしていたコードを。DistributionManagerを使うと、キーがどのNodeに配置されているかがわかるので、それで確認していました。
src/main/scala/KeyAffinityServiceExample.scala
import scala.collection.JavaConverters._ import java.util.concurrent.Executors import org.infinispan.Cache import org.infinispan.affinity.{KeyAffinityServiceFactory, RndKeyGenerator} import org.infinispan.manager.DefaultCacheManager object KeyAffinityServiceExample { def main(args: Array[String]): Unit = { val manager = new DefaultCacheManager("infinispan.xml") val cache = manager.getCache[AnyRef, String]("keyAffinityCache") try { val distributionManager = cache.getAdvancedCache.getDistributionManager val localKeyExecutor = Executors.newSingleThreadExecutor val localKeyAffinityService = KeyAffinityServiceFactory.newLocalKeyAffinityService(cache, new RndKeyGenerator, localKeyExecutor, 100) val address = manager.getAddress for (i <- 0 to 5) { val key = localKeyAffinityService.getKeyForAddress(address) cache.put(key, "1") println(cache.getAdvancedCache.getDistributionManager.getConsistentHash.locatePrimaryOwner(key)) println(s"Generated KeyForAddress => value[$key], type[${key.getClass}]") println(localKeyAffinityService.getCollocatedKey(key)) } localKeyExecutor.shutdown() for (key <- cache.keySet.asScala) { println(s"Key[$key], Primary Location[${distributionManager.getPrimaryLocation(key)}]") } val executor = Executors.newSingleThreadExecutor val addresses = cache.getAdvancedCache.getRpcManager.getMembers val keyAffinityService = KeyAffinityServiceFactory.newKeyAffinityService(cache, addresses, new RndKeyGenerator, executor, 100) for (i <- 0 to 5) { val addr = addresses.get(i % addresses.size) val key = keyAffinityService.getKeyForAddress(addr) cache.put(key, "1") println(s"Generated KeyForAddress => value[$key], type[${key.getClass}]") println(s"Use Address[$addr], Primary Location[${distributionManager.getPrimaryLocation(key)}]" + " " + s"Locate[${distributionManager.locate(key)}]") val collocatedKey = keyAffinityService.getCollocatedKey(key) cache.put(collocatedKey, "1") println(s"CollocatedKey[$collocatedKey], Primary Location[${distributionManager.getPrimaryLocation(collocatedKey)}]") } executor.shutdown() } finally { cache.stop() manager.stop() } } }