以前Segmentsの設定の確認だけはやったことはあったのですが、もう少し理解を進めるために、復習を兼ねて遊んでみます。
7.4.2. Hashing Algorithms
http://infinispan.org/docs/6.0.x/user_guide/user_guide.html#_hashing_algorithms
以前、Segmentsの設定に関してだけは試したことがあったのですが、ここで書かれていた意味がもうちょっと頭に入ったのは他のデータグリッドを使った後でしたね。
InfinispanのSegmentsを使ってみる
http://d.hatena.ne.jp/Kazuhira/20130901/1378034880
InfinispanのSegmentsというのは、CoherenceやHazelcastでいうPartitionと考えればよさそうです。
ちなみに、以前エントリを書いた時に見ていたInfinispanのドキュメントは、サイトの移行に伴いドキュメントの内容自体が変わってしまいました。
これらの話は、Infinispanがクラスタ内で各メンバーに対して、エントリの管理をどう割り当てるかという内容みたいです。キーを割り当てる時に、一定の領域に対してそのキーを管理するように割り当てるのですが、これをセグメントと呼んでいるみたいですねぇ。
ドキュメントを読む
それでは、新しく書かれた「7.4.2. Hashing Algorithms」を頑張って読んでみます。
概要
Infinispanのハッシュアルゴリズムは、コンシステントハッシュ法をベースにしていてはいるものの、少し異なる実装になっているようです。
コンシステントハッシュ法とは異なり、固定のセグメントにキーを配置する空間を分割します。セグメントの数はnumSegmentsで設定可能ですが、クラスタの再起動なしには変更することはできません。セグメントへのキーのマッピングは、固定となります。クラスタのトポロジがどのように変わったかに関わらず、同じセグメントにキーを割り当てるのが望ましいです。
各ハッシュ・セグメントは、オーナーと呼ばれるNodeのリストにマップされます。最初のオーナーはプライマリ・オーナーとして知られ、ロックのような多くのキャッシュ操作で特別な役割を持つため、順序は重要になります。他のオーナーは、バックアップ・オーナーと呼ばれています。各オーナーに対して、セグメントをどのようにマップするかの法則が決まっているわけではありません。ハッシュアルゴリズムは一般に各Nodeに割り当てられたセグメントの数のバランスを取ろうとし、クラスタにNodeが参加したり退出した後には、セグメントの移動の数を最小にする必要があります。
Infinispanで使えるハッシュアルゴリズム
Infinispanのハッシュアルゴリズムはカスタマイズ可能で、デフォルトでは5つの実装が用意されています。
- DefaultConsistentHashFactory
- デフォルトのハッシュアルゴリズムでかなり均一の分散を実現しますが、ひとつ欠点があります。Nodeのセグメントへのマッピングは、キャッシュのクラスタへの参加した順番に依存します。よって、キーのオーナーがクラスタ内で実行されているすべてのキャッシュで同じであることが保証されていません
- TopologyAwareConsistentHashFactory
- サーバ・ヒンティング(サーバの物理配置の設定)を有効にした場合は、自動的に選ばれます。デフォルトのアルゴリズムと似ていますが、可能な限り多くのサイト、ラック、マシンの間で各セグメントのコピーを拡散します
- SyncConsistentHashFactory
- コンシステントハッシュ法によく似た別のアルゴリズムで、デフォルトのアルゴリズムの弱点に対応して、クラスタが長時間対称的になるように、あるキーがいつも各キャッシュの同じNodeに割り当てられるようにします。しかし、これ自身にも弱点はあり、負荷分散がそれほどでなくても、クラスタへのNodeの参加、退出に伴いセグメントを必要以上に移動するかもしれません
- TopologyAwareSyncConsistentHashFactory
- SyncConsistentHashFactoryによく似ていますが、サーバ・ヒンティングに対応しています
- ReplicatedConsistentHashFactory
- キャッシュがレプリケーションモードの時に内部的に使用されるアルゴリズムで、分散キャッシュのユーザが明示的に選択することはありません
試してみる
それでは、これらの設定をかいつまんで試してみましょう。
準備
まずは、Infinispanを使うために依存関係の定義。
build.sbt
name := "infinispan-hash-algorithm" version := "0.0.1-SNAPSHOT" scalaVersion := "2.10.4" organization := "org.littlewings" scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked") fork in Test := true parallelExecution in Test := false libraryDependencies ++= Seq( "org.infinispan" % "infinispan-core" % "6.0.2.Final" excludeAll( ExclusionRule(organization = "org.jgroups", name = "jgroups"), ExclusionRule(organization = "org.jboss.marshalling", name = "jboss-marshalling-river"), ExclusionRule(organization = "org.jboss.marshalling", name = "jboss-marshalling"), ExclusionRule(organization = "org.jboss.logging", name = "jboss-logging"), ExclusionRule(organization = "org.jboss.spec.javax.transaction", name = "jboss-transaction-api_1.1_spec") ), "org.jgroups" % "jgroups" % "3.4.1.Final", "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.1_spec" % "1.0.1.Final", "org.jboss.marshalling" % "jboss-marshalling-river" % "1.4.4.Final", "org.jboss.marshalling" % "jboss-marshalling" % "1.4.4.Final", "org.jboss.logging" % "jboss-logging" % "3.1.2.GA", "net.jcip" % "jcip-annotations" % "1.0", "org.scalatest" %% "scalatest" % "2.1.2" % "test", "log4j" % "log4j" % "1.2.14" % "test" )
ちょっと、今回はInfinispanのログには黙っていてもらいました。
src/test/resources/log4j.xml
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" > <appender name="empty" class="org.apache.log4j.varia.NullAppender"> </appender> <root> <appender-ref ref="empty"/> </root> </log4j:configuration>
Infinispanの基本設定。JGroupsの設定は、ここでは端折ります。
src/test/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd" xmlns="urn:infinispan:config:6.0"> <global> <transport clusterName="hash-algorithm-cluster"> <properties> <property name="configurationFile" value="jgroups.xml" /> </properties> </transport> <globalJmxStatistics enabled="true" jmxDomain="org.infinispan" cacheManagerName="DefaultCacheManager" allowDuplicateDomains="true" /> <shutdown hookBehavior="REGISTER"/> </global> <!-- ここにnamedCacheを定義する --> </infinispan>
キャッシュの定義自体は、あとでまた載せていきます。
numSegmentsの設定を確認してみる
まずは、numSegmentsの設定を変えて動かしてみましょう。
以下のようなベースのテストコードを用意します。
src/test/scala/org/littlewings/infinispan/hashalgorithm/InfinispanSegmentsSpec.scala
package org.littlewings.infinispan.hashalgorithm import scala.collection.JavaConverters._ import org.infinispan.Cache import org.infinispan.manager.DefaultCacheManager import org.scalatest.FunSpec import org.scalatest.Matchers._ class InfinispanSegmentsSpec extends FunSpec { describe("infinispan segment spec") { // ここに、テストを書く! } def withCache[K, V](numInstances: Int, cacheName: String)(fun: Cache[K, V] => Unit): Unit = { val managers = (1 to numInstances).map(_ => new DefaultCacheManager("infinispan.xml")) managers.foreach(_.getCache[K, V](cacheName)) val cache = managers.head.getCache[K, V](cacheName) try { fun(cache) } finally { cache.stop() managers.foreach(_.stop()) } } }
クラスタ内のNode数を指定して、クラスタを構成しつつテストをするコードですね。
使用するキャッシュの構成は、こんな感じ。
<namedCache name="singleSegmentCache"> <clustering mode="dist"> <hash numSegments="1" /> </clustering> </namedCache> <namedCache name="defaultCache"> <clustering mode="dist" /> </namedCache>
片方は、numSegmentsを1に、もう片方はデフォルト(60)です。
これらに対して、テストを書いていきます。
まずは、numSegmentsが1の方のテスト。
it("singleSegmentCache") { println("========== Start singleSegmentCache ===========") withCache[String, String](4, "singleSegmentCache") { cache => val range = 1 to 10 range.foreach(i => cache.put(s"key$i", s"value$i")) val dm = cache.getAdvancedCache.getDistributionManager val consistentHash = dm.getConsistentHash consistentHash.getNumSegments should be (1) } println("========== End singleSegmentCache ===========") }
ConsistentHashから取得できる、セグメントの数が1になっています。ちなみに、これ以降も含めて、クラスタ内のNode数は4となります。
続いて、デフォルトの場合。
it("defaultCache") { println("========== Start defaultCache ===========") withCache[String, String](4, "defaultCache") { cache => val range = 1 to 10 range.foreach(i => cache.put(s"key$i", s"value$i")) val dm = cache.getAdvancedCache.getDistributionManager val consistentHash = dm.getConsistentHash consistentHash.getNumSegments should be (60) dm.getPrimaryLocation("key1") should be (consistentHash.locatePrimaryOwner("key1")) dm.locate("key10") should be (consistentHash.locateOwners("key10")) consistentHash.locatePrimaryOwnerForSegment(0) should not be (null) consistentHash.locatePrimaryOwnerForSegment(59) should not be (null) an [ArrayIndexOutOfBoundsException] should be thrownBy consistentHash.locatePrimaryOwnerForSegment(60) } println("========== End defaultCache ===========") }
この場合、セグメントは60個あります。
consistentHash.getNumSegments should be (60)
範囲は、0〜59のようです。
consistentHash.locatePrimaryOwnerForSegment(0) should not be (null) consistentHash.locatePrimaryOwnerForSegment(59) should not be (null) an [ArrayIndexOutOfBoundsException] should be thrownBy consistentHash.locatePrimaryOwnerForSegment(60)
なお、セグメントの数はクラスタの最大サイズ×10が推奨値らしいです。
Controls the total number of hash space segments (per cluster). Recommended value is 10 * max_cluster_size. Defaults to 60.
http://docs.jboss.org/infinispan/6.0/configdocs/infinispan-config-6.0.html
Hash Algorithmの設定を変えてみる
続いては、セグメントの分配を行う、ハッシュアルゴリズムの設定を変えてみましょう。
キャッシュの設定としては、以下のようなものを用意しました。
<namedCache name="defaultConsistentHashCache"> <clustering mode="dist"> <hash factory="org.infinispan.distribution.ch.DefaultConsistentHashFactory" /> </clustering> </namedCache> <namedCache name="syncConsistentHashCache"> <clustering mode="dist"> <hash factory="org.infinispan.distribution.ch.SyncConsistentHashFactory" /> </clustering> </namedCache>
最初のは、実はデフォルトですが。
続いて、テストコードの骨格。
src/test/scala/org/littlewings/infinispan/hashalgorithm/InfinispanHashAlgorithmSpec.scala
package org.littlewings.infinispan.hashalgorithm import scala.collection._ import scala.collection.JavaConverters._ import scala.util.Random import org.infinispan.Cache import org.infinispan.manager.{EmbeddedCacheManager, DefaultCacheManager} import org.scalatest.{BeforeAndAfter, FunSpec} import org.scalatest.Matchers._ class InfinispanHashAlgorithmSpec extends FunSpec with BeforeAndAfter { private var otherManagers: mutable.ArrayBuffer[EmbeddedCacheManager] = _ before { otherManagers = mutable.ArrayBuffer.empty } describe("infinispan hash-algorithm spec") { // ここに、テストを書く! } def createCacheManager: EmbeddedCacheManager = new DefaultCacheManager("infinispan.xml") def withCache[K, V](numInstances: Int, cacheName: String)(fun: Cache[K, V] => Unit): Unit = { val managers = (1 to numInstances).map(_ => createCacheManager) val manager = managers.head otherManagers = mutable.ArrayBuffer.empty ++ managers.slice(1, numInstances) managers.foreach(_.getCache[K, V](cacheName)) val cache = manager.getCache[K, V](cacheName) try { fun(cache) } finally { cache.stop() (manager +: otherManagers).foreach(_.stop()) } } def downMember(): Unit = { val manager = otherManagers(Random.nextInt(otherManagers.size)) otherManagers -= manager manager.stop() Thread.sleep(2 * 1000) } def newMember[K, V](cache: Cache[K, V]): Unit = { val manager = createCacheManager manager.getCache[K, V](cache.getName) otherManagers += manager } }
先ほどのコードとそんなに変わらない感じですが、最初に起動したNode以外に対して、Nodeをダウンさせたり追加させたりすることができるようにしています。なお、ダウンさせるメンバはランダムです。
では、デフォルトのハッシュアルゴリズムの方で。
it("defaultConsistentHashCache") { println("========== Start defaultConsistentCache ===========") withCache[String, String](4, "defaultConsistentHashCache") { cache => val range = 1 to 10 range.foreach(i => cache.put(s"key$i", s"value$i")) val advancedCache = cache.getAdvancedCache val dm = advancedCache.getDistributionManager val consistentHash = dm.getConsistentHash consistentHash.getNumSegments should be (60) println(s"""|Cluster Members: |${advancedCache .getRpcManager .getMembers .asScala .mkString(" ", System.lineSeparator + " ", "")}""".stripMargin) println("===") println("Cluster Initial State[DefaultConsistentHash]:") range.foreach(i => println(s" Key[key$i]: Segment[${dm.getConsistentHash.getSegment(s"key$i")}] Primary[${dm.getPrimaryLocation(s"key$i")}]")) downMember() newMember(cache) // downMember() // newMember(cache) println("===") println(s"""|Cluster Members: |${advancedCache .getRpcManager .getMembers .asScala .mkString(" ", System.lineSeparator + " ", "")}""".stripMargin) println("===") println("Cluster Member Changed State[DefaultConsistentHash]:") range.foreach(i => cache.put(s"key$i", s"value$i")) range.foreach(i => println(s" Key[key$i]: Segment[${dm.getConsistentHash.getSegment(s"key$i")}] Primary[${dm.getPrimaryLocation(s"key$i")}]")) } println("========== End defaultConsistentCache ===========") }
データを投入した後、クラスタのメンバの表示とデータの配置状況の表示を行います。その後、クラスタからひとつメンバーを落として新規にひとつ追加して、再度メンバと配置状況を表示します。
最初は2つのNodeを落として追加していたのですが、ひとつの方が結果がわかりやすかったので、2回目はコメントアウトしています。
こちらは、動作結果を貼っておきましょう。
初期状態。
Cluster Members: my-hostname-13196 my-hostname-45111 my-hostname-18288 my-hostname-32168 === Cluster Initial State[DefaultConsistentHash]: Key[key1]: Segment[10] Primary[my-hostname-13196] Key[key2]: Segment[2] Primary[my-hostname-13196] Key[key3]: Segment[4] Primary[my-hostname-13196] Key[key4]: Segment[21] Primary[my-hostname-18288] Key[key5]: Segment[49] Primary[my-hostname-32168] Key[key6]: Segment[18] Primary[my-hostname-32168] Key[key7]: Segment[24] Primary[my-hostname-18288] Key[key8]: Segment[13] Primary[my-hostname-13196] Key[key9]: Segment[42] Primary[my-hostname-45111] Key[key10]: Segment[37] Primary[my-hostname-45111]
メンバー退出、追加後。
Cluster Members: my-hostname-13196 my-hostname-45111 my-hostname-18288 my-hostname-43450 === Cluster Member Changed State[DefaultConsistentHash]: Key[key1]: Segment[10] Primary[my-hostname-13196] Key[key2]: Segment[2] Primary[my-hostname-13196] Key[key3]: Segment[4] Primary[my-hostname-13196] Key[key4]: Segment[21] Primary[my-hostname-18288] Key[key5]: Segment[49] Primary[my-hostname-18288] Key[key6]: Segment[18] Primary[my-hostname-43450] Key[key7]: Segment[24] Primary[my-hostname-18288] Key[key8]: Segment[13] Primary[my-hostname-13196] Key[key9]: Segment[42] Primary[my-hostname-45111] Key[key10]: Segment[37] Primary[my-hostname-45111]
今回は、最後のNodeがダウンしたようです。
前後で比較すると、この部分しか差がありません。ダウン前。
Key[key5]: Segment[49] Primary[my-hostname-32168] Key[key6]: Segment[18] Primary[my-hostname-32168]
ダウン後。
Key[key5]: Segment[49] Primary[my-hostname-18288] Key[key6]: Segment[18] Primary[my-hostname-43450]
新しいメンバ(my-hostname-43450)ーに対しては、プライマリオーナーとなっているセグメントはひとつしかないので、ちょっとアンバランスですね。
続いて、SyncConsistentHashFactoryを使用した場合。
it("syncConsistentHashCache") { println("========== Start syncConsistentCache ===========") withCache[String, String](4, "syncConsistentHashCache") { cache => val range = 1 to 10 range.foreach(i => cache.put(s"key$i", s"value$i")) val advancedCache = cache.getAdvancedCache val dm = advancedCache.getDistributionManager val consistentHash = dm.getConsistentHash consistentHash.getNumSegments should be (60) println(s"""|Cluster Members: |${advancedCache .getRpcManager .getMembers .asScala .mkString(" ", System.lineSeparator + " ", "")}""".stripMargin) println("===") println("Cluster Initial State[SyncConsistentHash]:") range.foreach(i => println(s" Key[key$i]: Segment[${dm.getConsistentHash.getSegment(s"key$i")}] Primary[${dm.getPrimaryLocation(s"key$i")}]")) downMember() newMember(cache) // downMember() // newMember(cache) println("===") println(s"""|Cluster Members: |${advancedCache .getRpcManager .getMembers .asScala .mkString(" ", System.lineSeparator + " ", "")}""".stripMargin) println("===") println("Cluster Member Changed State[SyncConsistentHash]:") range.foreach(i => cache.put(s"key$i", s"value$i")) range.foreach(i => println(s" Key[key$i]: Segment[${dm.getConsistentHash.getSegment(s"key$i")}] Primary[${dm.getPrimaryLocation(s"key$i")}]")) } println("========== End syncConsistentCache ===========") }
初期状態。
Cluster Members: my-hostname-20014 my-hostname-42566 my-hostname-48745 my-hostname-18569 === Cluster Initial State[SyncConsistentHash]: Key[key1]: Segment[10] Primary[my-hostname-42566] Key[key2]: Segment[2] Primary[my-hostname-18569] Key[key3]: Segment[4] Primary[my-hostname-18569] Key[key4]: Segment[21] Primary[my-hostname-48745] Key[key5]: Segment[49] Primary[my-hostname-18569] Key[key6]: Segment[18] Primary[my-hostname-20014] Key[key7]: Segment[24] Primary[my-hostname-20014] Key[key8]: Segment[13] Primary[my-hostname-42566] Key[key9]: Segment[42] Primary[my-hostname-42566] Key[key10]: Segment[37] Primary[my-hostname-20014]
メンバー退出、追加後。
Cluster Members: my-hostname-20014 my-hostname-42566 my-hostname-18569 my-hostname-25460 === Cluster Member Changed State[SyncConsistentHash]: Key[key1]: Segment[10] Primary[my-hostname-42566] Key[key2]: Segment[2] Primary[my-hostname-25460] Key[key3]: Segment[4] Primary[my-hostname-25460] Key[key4]: Segment[21] Primary[my-hostname-18569] Key[key5]: Segment[49] Primary[my-hostname-18569] Key[key6]: Segment[18] Primary[my-hostname-42566] Key[key7]: Segment[24] Primary[my-hostname-20014] Key[key8]: Segment[13] Primary[my-hostname-25460] Key[key9]: Segment[42] Primary[my-hostname-42566] Key[key10]: Segment[37] Primary[my-hostname-25460]
今回は、3番目のNodeがダウンしたようです。
前後で比較すると、けっこうプライマリ・オーナーが移動しています。差分だけ、記述します。ダウン前。
Key[key2]: Segment[2] Primary[my-hostname-18569] Key[key3]: Segment[4] Primary[my-hostname-18569] Key[key4]: Segment[21] Primary[my-hostname-48745] Key[key6]: Segment[18] Primary[my-hostname-20014] Key[key8]: Segment[13] Primary[my-hostname-42566] Key[key10]: Segment[37] Primary[my-hostname-20014]
ダウン後。
Key[key2]: Segment[2] Primary[my-hostname-25460] Key[key3]: Segment[4] Primary[my-hostname-25460] Key[key4]: Segment[21] Primary[my-hostname-18569] Key[key6]: Segment[18] Primary[my-hostname-42566] Key[key8]: Segment[13] Primary[my-hostname-25460] Key[key10]: Segment[37] Primary[my-hostname-25460]
というわけで、SyncConsistentHashにすると、できる限り均等になるように分配してくれるようですが、その分セグメントの移動量も多くなるということですね。
勉強になりましたー。
今回作成したコードは、こちらにアップしています。
https://github.com/kazuhira-r/infinispan-examples/tree/master/infinispan-hash-algorithm