CLOVER🍀

That was when it all began.

Infinispanで、特定のNodeに配置するキーを生成する

これまた、気になっていたAPIを。

Key affinity service
https://docs.jboss.org/author/display/ISPN/Key+affinity+service

分散したInfinispanクラスタで、特定のNodeに値が配置されるようにしたい場合に使うクラスのようです。Nodeを識別するための、クラスタ内でのアドレスを使用して、特定のノードにハッシュされるキーを生成しますよ、と。

要は、Primary Locationを決めてデータを配置するためのAPIみたですね。

準備

このAPIを使うためには、少なくともInfinispanがクラスタリングモードで設定されている必要があります(たぶん、分散モードでないとダメかも…)。

build.sbt

name := "infinispan-key-affinity-service-example"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.10.2"

organization := "littlewings"

fork in run := true

resolvers += "JBoss Public Maven Repository Group" at "http://repository.jboss.org/nexus/content/groups/public-jboss/"

libraryDependencies += "org.infinispan" % "infinispan-tree" % "5.3.0.CR1"

使用するキャッシュの名前は、「keyAffinityCache」にしました。
src/main/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:5.3 http://www.infinispan.org/schemas/infinispan-config-5.3.xsd"
    xmlns="urn:infinispan:config:5.3">

  <global>
    <transport clusterName="key-affinity-cluster">
      <properties>
        <property name="configurationFile" value="jgroups.xml" />
      </properties>
    </transport>
    <globalJmxStatistics
        enabled="true"
        jmxDomain="org.infinispan"
        cacheManagerName="DefaultCacheManager"
        />
  </global>

  <namedCache name="keyAffinityCache">
    <jmxStatistics enabled="true"/>
    <clustering mode="distribution">
      <hash numOwners="2" />
      <sync />
    </clustering>
  </namedCache>
</infinispan>

JGroupsの設定。
src/main/resources/jgroups.xml

<?xml version="1.0" encoding="UTF-8"?>
<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.2.xsd">
  <UDP
      mcast_addr="${jgroups.udp.mcast_addr:228.11.11.11}"
      mcast_port="${jgroups.udp.mcast_port:45688}"
      tos="8"
      ucast_recv_buf_size="130000"
      ucast_send_buf_size="100000"
      mcast_recv_buf_size="130000"
      mcast_send_buf_size="100000"
      loopback="true"

      thread_naming_pattern="cl"

      thread_pool.enabled="true"
      thread_pool.min_threads="2"
      thread_pool.max_threads="8"
      thread_pool.keep_alive_time="5000"
      thread_pool.queue_enabled="true"
      thread_pool.queue_max_size="1000"
      thread_pool.rejection_policy="discard"

      oob_thread_pool.enabled="true"
      oob_thread_pool.min_threads="2"
      oob_thread_pool.max_threads="8"
      oob_thread_pool.keep_alive_time="1000"
      oob_thread_pool.queue_enabled="false"
      oob_thread_pool.rejection_policy="discard"
      />

  <PING />
  <FD_ALL />
  <FD_SOCK />
  <UNICAST2 />
  <pbcast.NAKACK2 />
  <pbcast.GMS print_local_addr="true" />
</config>

基本的な使い方

だいたい、こんな感じです。

とりあえず、普通にCacheまで作成します。

val manager = new DefaultCacheManager("infinispan.xml")
val cache = manager.getCache[AnyRef, String]("keyAffinityCache")

次に、JDKのExecutorを用意して、KeyAffinityServiceFactoryクラスのstaticメソッドからKeyAffinityServiceクラスのインスタンスを取得します。

val executor = Executors.newSingleThreadExecutor
val keyAffinityService =
  KeyAffinityServiceFactory.newLocalKeyAffinityService(cache,
                                                       new RndKeyGenerator,
                                                       executor,
                                                       100)

KeyAffinityServiceFactory.newLocalKeyAffinityServicメソッドを使用した場合、第1引数はCache、第2引数はキー生成器、第3引数にExecutor、第4引数にバッファサイズです。

オーバーロードされた版もあるのですが、こちらはKeyAffinityServiceを開始するかどうかを決めることができます。

このメソッドの内部では、自分自身のアドレスを対象とするように設定します。この辺りは、また後で…。

で、キーを生成するためには、KeyAffinityService#getKeyForAddressメソッドを呼び出すわけですが、その時にはAddressが必要になります。自分自身のAddressを取得する場合は、EmbeddedCacheManager#getAddressで取得しましょう。

val address = manager.getAddress

あとは、KeyAffinityService#getKeyForAddressメソッドを呼び出すだけです。

val key = keyAffinityService.getKeyForAddress(address)

ここで渡すAddressに対して、値を配置できるキーを生成するわけです。

ちなみに、生成されるキーの型はLongです。

KeyAffinityServiceFactory.newLocalKeyAffinityServicメソッドの引数にRndKeyGeneratorクラスのインスタンスを渡していましたが、これはデフォルトで用意されているKeyGeneratorインターフェースの実装です。

KeyAffinityServiceFactoryとかKeyAffinityServiceはいかにもCacheの型パラメータでジェネリックな感じですが、そんなことありません。

そういうの使いたかったら、用途に応じて自分で用意するんでしょうね。

ちなみに、RndKeyGeneratorのキーの生成方法は、ただのjava.util.Randomです…。

最後、KeyAffinityServiceを生成するのに使用した、Executorは自分でシャットダウンする必要があります。

executor.shutdown()

このExecutorがあるため、キーは非同期で生成されます。そのバッファサイズが、生成時のパラメータというわけですね。

KeyAffinityServiceは、Lifecycleインターフェースのstart/stopメソッドを実装しているため、任意に起動停止することができます。まあ、EmbeddedCacheManagerをシャットダウンした場合は、一緒に止まってくれるようですが。

トポロジが変わった場合は、どうなるんだ?(微妙に英訳…)

(Topology changesより)
トポロジが変わった場合は、KeyAffinityServiceからのキーの所有者の変更があるかもしれません。KeyAffinityServiceは、トポロジに変更があった場合はそれを追跡し続け、陳腐化したキーを返却しないようにします。例えば、キーがある特定のNodeから、別のNodeにマップされてしまった場合など。

しかし、これはキーが使用される時に、そのNodeが変わっていないことを保証するものではありません。

例えば、

  • スレッドT1がNode Aにマップされたキーk1を読み取る
  • トポロジに変更があり、キーk1はNode Bにマップされる
  • スレッドT1はキーk1と何かを、Cacheに加える。この時、キーk1はNode Bにマップされており、異なるNodeが読み取りリクエストを行う

この動作は理想的ではありませんが、クラスタの変更があっても既に使用されている全てのキーに対しては、アプリケーションのための振る舞いをサポートするべきでしょう。
*ちょっと意味がわかりません…
KeyAffinityServiceは、トポロジの変更が当てはまらないような、安定したクラスタに対して近いアクセスでの最適化を提供します。

…だそうで。なんか英訳微妙ですが、つまり、トポロジに変更があるような環境で使うな、と。

まーそうですよね、途中でクラスタメンバ変わったりすると、場合によってはエラーになっちゃうだろうし(後述のfilterを使ったりする場合は、特に)。

う〜ん…。

まあ、いいや。

その他

あと、APIをもうちょっと深堀り。KeyAffinityServiceFactoryクラスには、他にnewKeyAffinityServiceというファクトリメソッドがオーバーロードされて定義してあります。

このうち、

Collection<Address> filter

を引数に取らないものとそうでないもので、生成されるインスタンスの振る舞いが変わります。

ここで指定するフィルタは、KeyAffinityServiceで対象とするNodeのAddressをCollectionとして渡します。すると、KeyAffinityService#getKeyForAddressメソッドの引数に、ここで指定したAdressが渡せるようになります。要は、自分自身以外のNodeに特化したキーを生成できるということです。

ただ、filterに渡すAddressは、もちろんクラスタのメンバである必要がありますが。

例えば、こんな感じでKeyAffinityServiceを生成すると、クラスタの全メンバを対象にできるようになります。

val executor = Executors.newSingleThreadExecutor
val addresses = cache.getAdvancedCache.getRpcManager.getMembers
val keyAffinityService =
  KeyAffinityServiceFactory.newKeyAffinityService(cache,
                                                  addresses,
                                                  new RndKeyGenerator,
                                                  executor,
                                                  100)

あとは、普通に使えばOKです。例えば、以下では0から5までで、各Nodeに割り振るキーを生成しています。

for (i <- 0 to 5) {
  val addr = addresses.get(i % addresses.size)
  val key = keyAffinityService.getKeyForAddress(addr)
  cache.put(key, "1")
}

なお、filterで指定した以外のAddressを渡すと、「そんなメンバークラスタにいないよ」とIllegalStateExceptionが投げられます。KeyAffinityServiceFactory.newLocalKeyAffinityServiceメソッドでKeyAffinityServiceのインスタンスを生成した場合は、自Nodeのみを対象にしたKeyAffinityServiceを生成します。

filter指定なしのKeyAffinityServiceFactory.newKeyAffinityServiceメソッドを使用した場合は、Nodeのチェックは行わなくなるみたいです。

そして、KeyAffinityService#getCollocatedKey(key)というメソッド。こちらは、指定したキーから、そのキーのPrimary Locationを探し、そのAddressから再度getKeyForAddressを呼び出してキーを生成するメソッドです。

ちなみに、KeyAffinityServiceがどうやって特定のNodeに配置されるようなキーを生成できているかというと、先にキーを生成した時に振られるAddressを算出しているからみたいです。
KeyAffinityServiceの実装、KeyAffinityServiceImplより。

   private Address getAddressForKey(Object key) {
      DistributionManager distributionManager = getDistributionManager();
      ConsistentHash hash = distributionManager.getConsistentHash();
      return hash.locatePrimaryOwner(key);
   }

説明する時はだいぶ端折っちゃいましたけど、最後に動作確認をしていたコードを。DistributionManagerを使うと、キーがどのNodeに配置されているかがわかるので、それで確認していました。
src/main/scala/KeyAffinityServiceExample.scala

import scala.collection.JavaConverters._

import java.util.concurrent.Executors

import org.infinispan.Cache
import org.infinispan.affinity.{KeyAffinityServiceFactory, RndKeyGenerator}
import org.infinispan.manager.DefaultCacheManager

object KeyAffinityServiceExample {
  def main(args: Array[String]): Unit = {
    val manager = new DefaultCacheManager("infinispan.xml")
    val cache = manager.getCache[AnyRef, String]("keyAffinityCache")

    try {
      val distributionManager = cache.getAdvancedCache.getDistributionManager

      val localKeyExecutor = Executors.newSingleThreadExecutor
      val localKeyAffinityService =
        KeyAffinityServiceFactory.newLocalKeyAffinityService(cache,
                                                             new RndKeyGenerator,
                                                             localKeyExecutor,
                                                             100)

      val address = manager.getAddress

      for (i <- 0 to 5) {
        val key = localKeyAffinityService.getKeyForAddress(address)
        cache.put(key, "1")
        println(cache.getAdvancedCache.getDistributionManager.getConsistentHash.locatePrimaryOwner(key))
        println(s"Generated KeyForAddress => value[$key], type[${key.getClass}]")
        println(localKeyAffinityService.getCollocatedKey(key))
      }

      localKeyExecutor.shutdown()

      for (key <- cache.keySet.asScala) {
        println(s"Key[$key], Primary Location[${distributionManager.getPrimaryLocation(key)}]")
      }

      val executor = Executors.newSingleThreadExecutor
      val addresses = cache.getAdvancedCache.getRpcManager.getMembers
      val keyAffinityService =
        KeyAffinityServiceFactory.newKeyAffinityService(cache,
                                                        addresses,
                                                        new RndKeyGenerator,
                                                        executor,
                                                        100)

      for (i <- 0 to 5) {
        val addr = addresses.get(i % addresses.size)
        val key = keyAffinityService.getKeyForAddress(addr)
        cache.put(key, "1")
        println(s"Generated KeyForAddress => value[$key], type[${key.getClass}]")
        println(s"Use Address[$addr], Primary Location[${distributionManager.getPrimaryLocation(key)}]" +
                " " + s"Locate[${distributionManager.locate(key)}]")
        val collocatedKey = keyAffinityService.getCollocatedKey(key)
        cache.put(collocatedKey, "1")
        println(s"CollocatedKey[$collocatedKey], Primary Location[${distributionManager.getPrimaryLocation(collocatedKey)}]")
      }

      executor.shutdown()
    } finally {
      cache.stop()
      manager.stop()
    }
  }
}