CLOVER🍀

That was when it all began.

InfinispanのSegmentsとHashing Algorithmの設定で遊んでみる

以前Segmentsの設定の確認だけはやったことはあったのですが、もう少し理解を進めるために、復習を兼ねて遊んでみます。

7.4.2. Hashing Algorithms
http://infinispan.org/docs/6.0.x/user_guide/user_guide.html#_hashing_algorithms

以前、Segmentsの設定に関してだけは試したことがあったのですが、ここで書かれていた意味がもうちょっと頭に入ったのは他のデータグリッドを使った後でしたね。

InfinispanのSegmentsを使ってみる
http://d.hatena.ne.jp/Kazuhira/20130901/1378034880

InfinispanのSegmentsというのは、CoherenceやHazelcastでいうPartitionと考えればよさそうです。

ちなみに、以前エントリを書いた時に見ていたInfinispanのドキュメントは、サイトの移行に伴いドキュメントの内容自体が変わってしまいました。

これらの話は、Infinispanがクラスタ内で各メンバーに対して、エントリの管理をどう割り当てるかという内容みたいです。キーを割り当てる時に、一定の領域に対してそのキーを管理するように割り当てるのですが、これをセグメントと呼んでいるみたいですねぇ。

ドキュメントを読む

それでは、新しく書かれた「7.4.2. Hashing Algorithms」を頑張って読んでみます。

概要

Infinispanのハッシュアルゴリズムは、コンシステントハッシュ法をベースにしていてはいるものの、少し異なる実装になっているようです。

コンシステントハッシュ法とは異なり、固定のセグメントにキーを配置する空間を分割します。セグメントの数はnumSegmentsで設定可能ですが、クラスタの再起動なしには変更することはできません。セグメントへのキーのマッピングは、固定となります。クラスタのトポロジがどのように変わったかに関わらず、同じセグメントにキーを割り当てるのが望ましいです。

各ハッシュ・セグメントは、オーナーと呼ばれるNodeのリストにマップされます。最初のオーナーはプライマリ・オーナーとして知られ、ロックのような多くのキャッシュ操作で特別な役割を持つため、順序は重要になります。他のオーナーは、バックアップ・オーナーと呼ばれています。各オーナーに対して、セグメントをどのようにマップするかの法則が決まっているわけではありません。ハッシュアルゴリズムは一般に各Nodeに割り当てられたセグメントの数のバランスを取ろうとし、クラスタにNodeが参加したり退出した後には、セグメントの移動の数を最小にする必要があります。

Infinispanで使えるハッシュアルゴリズム

Infinispanのハッシュアルゴリズムはカスタマイズ可能で、デフォルトでは5つの実装が用意されています。

DefaultConsistentHashFactory
デフォルトのハッシュアルゴリズムでかなり均一の分散を実現しますが、ひとつ欠点があります。Nodeのセグメントへのマッピングは、キャッシュのクラスタへの参加した順番に依存します。よって、キーのオーナーがクラスタ内で実行されているすべてのキャッシュで同じであることが保証されていません
TopologyAwareConsistentHashFactory
サーバ・ヒンティング(サーバの物理配置の設定)を有効にした場合は、自動的に選ばれます。デフォルトのアルゴリズムと似ていますが、可能な限り多くのサイト、ラック、マシンの間で各セグメントのコピーを拡散します
SyncConsistentHashFactory
コンシステントハッシュ法によく似た別のアルゴリズムで、デフォルトのアルゴリズムの弱点に対応して、クラスタが長時間対称的になるように、あるキーがいつも各キャッシュの同じNodeに割り当てられるようにします。しかし、これ自身にも弱点はあり、負荷分散がそれほどでなくても、クラスタへのNodeの参加、退出に伴いセグメントを必要以上に移動するかもしれません
TopologyAwareSyncConsistentHashFactory
SyncConsistentHashFactoryによく似ていますが、サーバ・ヒンティングに対応しています
ReplicatedConsistentHashFactory
キャッシュがレプリケーションモードの時に内部的に使用されるアルゴリズムで、分散キャッシュのユーザが明示的に選択することはありません
というわけで

セグメントと、その時のハッシュアルゴリズムでした。ここに、Infinispan 6.0から使用できるcapacityFactorを加えることで、Nodeが持つデータに偏りを持たせることができます。

サーバの処理能力に合わせての分配や、クラスタに参加したいけれどデータは持ちたくない場合などに使用します。

試してみる

それでは、これらの設定をかいつまんで試してみましょう。

準備

まずは、Infinispanを使うために依存関係の定義。
build.sbt

name := "infinispan-hash-algorithm"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.10.4"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked")

fork in Test := true

parallelExecution in Test := false

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-core" % "6.0.2.Final" excludeAll(
    ExclusionRule(organization = "org.jgroups", name = "jgroups"),
    ExclusionRule(organization = "org.jboss.marshalling", name = "jboss-marshalling-river"),
    ExclusionRule(organization = "org.jboss.marshalling", name = "jboss-marshalling"),
    ExclusionRule(organization = "org.jboss.logging", name = "jboss-logging"),
    ExclusionRule(organization = "org.jboss.spec.javax.transaction", name = "jboss-transaction-api_1.1_spec")
  ),
  "org.jgroups" % "jgroups" % "3.4.1.Final",
  "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.1_spec" % "1.0.1.Final",
  "org.jboss.marshalling" % "jboss-marshalling-river" % "1.4.4.Final",
  "org.jboss.marshalling" % "jboss-marshalling" % "1.4.4.Final",
  "org.jboss.logging" % "jboss-logging" % "3.1.2.GA",
  "net.jcip" % "jcip-annotations" % "1.0",
  "org.scalatest" %% "scalatest" % "2.1.2" % "test",
  "log4j" % "log4j" % "1.2.14" % "test"
)

ちょっと、今回はInfinispanのログには黙っていてもらいました。
src/test/resources/log4j.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" >

  <appender name="empty" class="org.apache.log4j.varia.NullAppender">
  </appender>

  <root>
    <appender-ref ref="empty"/>
  </root>
</log4j:configuration>

Infinispanの基本設定。JGroupsの設定は、ここでは端折ります。
src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd"
    xmlns="urn:infinispan:config:6.0">

  <global>
    <transport clusterName="hash-algorithm-cluster">
      <properties>
        <property name="configurationFile" value="jgroups.xml" />
      </properties>
    </transport>

    <globalJmxStatistics
        enabled="true"
        jmxDomain="org.infinispan"
        cacheManagerName="DefaultCacheManager"
        allowDuplicateDomains="true"
        />

    <shutdown hookBehavior="REGISTER"/>
  </global>

  <!-- ここにnamedCacheを定義する -->

</infinispan>

キャッシュの定義自体は、あとでまた載せていきます。

numSegmentsの設定を確認してみる

まずは、numSegmentsの設定を変えて動かしてみましょう。

以下のようなベースのテストコードを用意します。
src/test/scala/org/littlewings/infinispan/hashalgorithm/InfinispanSegmentsSpec.scala

package org.littlewings.infinispan.hashalgorithm

import scala.collection.JavaConverters._

import org.infinispan.Cache
import org.infinispan.manager.DefaultCacheManager

import org.scalatest.FunSpec
import org.scalatest.Matchers._

class InfinispanSegmentsSpec extends FunSpec {
  describe("infinispan segment spec") {
    // ここに、テストを書く!
  }


  def withCache[K, V](numInstances: Int, cacheName: String)(fun: Cache[K, V] => Unit): Unit = {
    val managers = (1 to numInstances).map(_ => new DefaultCacheManager("infinispan.xml"))

    managers.foreach(_.getCache[K, V](cacheName))

    val cache = managers.head.getCache[K, V](cacheName)
    try {
      fun(cache)
    } finally {
      cache.stop()
      managers.foreach(_.stop())
    }
  }
}

クラスタ内のNode数を指定して、クラスタを構成しつつテストをするコードですね。

使用するキャッシュの構成は、こんな感じ。

  <namedCache name="singleSegmentCache">
    <clustering mode="dist">
      <hash numSegments="1" />
    </clustering>
  </namedCache>

  <namedCache name="defaultCache">
    <clustering mode="dist" />
  </namedCache>

片方は、numSegmentsを1に、もう片方はデフォルト(60)です。

これらに対して、テストを書いていきます。

まずは、numSegmentsが1の方のテスト。

    it("singleSegmentCache") {
      println("========== Start singleSegmentCache ===========")

      withCache[String, String](4, "singleSegmentCache") { cache =>
        val range = 1 to 10

        range.foreach(i => cache.put(s"key$i", s"value$i"))

        val dm = cache.getAdvancedCache.getDistributionManager
        val consistentHash = dm.getConsistentHash

        consistentHash.getNumSegments should be (1)
      }

      println("========== End singleSegmentCache ===========")
    }

ConsistentHashから取得できる、セグメントの数が1になっています。ちなみに、これ以降も含めて、クラスタ内のNode数は4となります。

続いて、デフォルトの場合。

    it("defaultCache") {
      println("========== Start defaultCache ===========")

      withCache[String, String](4, "defaultCache") { cache =>
        val range = 1 to 10

        range.foreach(i => cache.put(s"key$i", s"value$i"))

        val dm = cache.getAdvancedCache.getDistributionManager
        val consistentHash = dm.getConsistentHash

        consistentHash.getNumSegments should be (60)

        dm.getPrimaryLocation("key1") should be (consistentHash.locatePrimaryOwner("key1"))
        dm.locate("key10") should be (consistentHash.locateOwners("key10"))

        consistentHash.locatePrimaryOwnerForSegment(0) should not be (null)
        consistentHash.locatePrimaryOwnerForSegment(59) should not be (null)
        an [ArrayIndexOutOfBoundsException] should be thrownBy consistentHash.locatePrimaryOwnerForSegment(60)
      }

      println("========== End defaultCache ===========")
    }

この場合、セグメントは60個あります。

        consistentHash.getNumSegments should be (60)

範囲は、0〜59のようです。

        consistentHash.locatePrimaryOwnerForSegment(0) should not be (null)
        consistentHash.locatePrimaryOwnerForSegment(59) should not be (null)
        an [ArrayIndexOutOfBoundsException] should be thrownBy consistentHash.locatePrimaryOwnerForSegment(60)

なお、セグメントの数はクラスタの最大サイズ×10が推奨値らしいです。

Controls the total number of hash space segments (per cluster). Recommended value is 10 * max_cluster_size. Defaults to 60.

http://docs.jboss.org/infinispan/6.0/configdocs/infinispan-config-6.0.html
Hash Algorithmの設定を変えてみる

続いては、セグメントの分配を行う、ハッシュアルゴリズムの設定を変えてみましょう。

キャッシュの設定としては、以下のようなものを用意しました。

  <namedCache name="defaultConsistentHashCache">
    <clustering mode="dist">
      <hash factory="org.infinispan.distribution.ch.DefaultConsistentHashFactory" />
    </clustering>
  </namedCache>

  <namedCache name="syncConsistentHashCache">
    <clustering mode="dist">
      <hash factory="org.infinispan.distribution.ch.SyncConsistentHashFactory" />
    </clustering>
  </namedCache>

最初のは、実はデフォルトですが。

続いて、テストコードの骨格。
src/test/scala/org/littlewings/infinispan/hashalgorithm/InfinispanHashAlgorithmSpec.scala

package org.littlewings.infinispan.hashalgorithm

import scala.collection._
import scala.collection.JavaConverters._
import scala.util.Random

import org.infinispan.Cache
import org.infinispan.manager.{EmbeddedCacheManager, DefaultCacheManager}

import org.scalatest.{BeforeAndAfter, FunSpec}
import org.scalatest.Matchers._

class InfinispanHashAlgorithmSpec extends FunSpec with BeforeAndAfter {
  private var otherManagers: mutable.ArrayBuffer[EmbeddedCacheManager] = _

  before {
    otherManagers = mutable.ArrayBuffer.empty
  }

  describe("infinispan hash-algorithm spec") {
    // ここに、テストを書く!
  }

  def createCacheManager: EmbeddedCacheManager =
    new DefaultCacheManager("infinispan.xml")

  def withCache[K, V](numInstances: Int, cacheName: String)(fun: Cache[K, V] => Unit): Unit = {
    val managers = (1 to numInstances).map(_ => createCacheManager)

    val manager = managers.head
    otherManagers = mutable.ArrayBuffer.empty ++ managers.slice(1, numInstances)

    managers.foreach(_.getCache[K, V](cacheName))

    val cache = manager.getCache[K, V](cacheName)
    try {
      fun(cache)
    } finally {
      cache.stop()
      (manager +: otherManagers).foreach(_.stop())
    }
  }

  def downMember(): Unit = {
    val manager = otherManagers(Random.nextInt(otherManagers.size))
    otherManagers -= manager
    manager.stop()

    Thread.sleep(2 * 1000)
  }

  def newMember[K, V](cache: Cache[K, V]): Unit = {
    val manager = createCacheManager
    manager.getCache[K, V](cache.getName)
    otherManagers += manager
  }
}

先ほどのコードとそんなに変わらない感じですが、最初に起動したNode以外に対して、Nodeをダウンさせたり追加させたりすることができるようにしています。なお、ダウンさせるメンバはランダムです。

では、デフォルトのハッシュアルゴリズムの方で。

    it("defaultConsistentHashCache") {
      println("========== Start defaultConsistentCache ===========")

      withCache[String, String](4, "defaultConsistentHashCache") { cache =>
        val range = 1 to 10

        range.foreach(i => cache.put(s"key$i", s"value$i"))

        val advancedCache = cache.getAdvancedCache
        val dm = advancedCache.getDistributionManager
        val consistentHash = dm.getConsistentHash

        consistentHash.getNumSegments should be (60)

        println(s"""|Cluster Members:
                    |${advancedCache
                        .getRpcManager
                        .getMembers
                        .asScala
                        .mkString("  ", System.lineSeparator + "  ", "")}""".stripMargin)

        println("===")

        println("Cluster Initial State[DefaultConsistentHash]:")
        range.foreach(i => println(s"  Key[key$i]: Segment[${dm.getConsistentHash.getSegment(s"key$i")}] Primary[${dm.getPrimaryLocation(s"key$i")}]"))

        downMember()
        newMember(cache)

        // downMember()
        // newMember(cache)

        println("===")

        println(s"""|Cluster Members:
                    |${advancedCache
                        .getRpcManager
                        .getMembers
                        .asScala
                        .mkString("  ", System.lineSeparator + "  ", "")}""".stripMargin)

        println("===")

        println("Cluster Member Changed State[DefaultConsistentHash]:")
        range.foreach(i => cache.put(s"key$i", s"value$i"))
        range.foreach(i => println(s"  Key[key$i]: Segment[${dm.getConsistentHash.getSegment(s"key$i")}] Primary[${dm.getPrimaryLocation(s"key$i")}]"))
      }

      println("========== End defaultConsistentCache ===========")
    }

データを投入した後、クラスタのメンバの表示とデータの配置状況の表示を行います。その後、クラスタからひとつメンバーを落として新規にひとつ追加して、再度メンバと配置状況を表示します。

最初は2つのNodeを落として追加していたのですが、ひとつの方が結果がわかりやすかったので、2回目はコメントアウトしています。

こちらは、動作結果を貼っておきましょう。

初期状態。

Cluster Members:
  my-hostname-13196
  my-hostname-45111
  my-hostname-18288
  my-hostname-32168
===
Cluster Initial State[DefaultConsistentHash]:
  Key[key1]: Segment[10] Primary[my-hostname-13196]
  Key[key2]: Segment[2] Primary[my-hostname-13196]
  Key[key3]: Segment[4] Primary[my-hostname-13196]
  Key[key4]: Segment[21] Primary[my-hostname-18288]
  Key[key5]: Segment[49] Primary[my-hostname-32168]
  Key[key6]: Segment[18] Primary[my-hostname-32168]
  Key[key7]: Segment[24] Primary[my-hostname-18288]
  Key[key8]: Segment[13] Primary[my-hostname-13196]
  Key[key9]: Segment[42] Primary[my-hostname-45111]
  Key[key10]: Segment[37] Primary[my-hostname-45111]

メンバー退出、追加後。

Cluster Members:
  my-hostname-13196
  my-hostname-45111
  my-hostname-18288
  my-hostname-43450
===
Cluster Member Changed State[DefaultConsistentHash]:
  Key[key1]: Segment[10] Primary[my-hostname-13196]
  Key[key2]: Segment[2] Primary[my-hostname-13196]
  Key[key3]: Segment[4] Primary[my-hostname-13196]
  Key[key4]: Segment[21] Primary[my-hostname-18288]
  Key[key5]: Segment[49] Primary[my-hostname-18288]
  Key[key6]: Segment[18] Primary[my-hostname-43450]
  Key[key7]: Segment[24] Primary[my-hostname-18288]
  Key[key8]: Segment[13] Primary[my-hostname-13196]
  Key[key9]: Segment[42] Primary[my-hostname-45111]
  Key[key10]: Segment[37] Primary[my-hostname-45111]

今回は、最後のNodeがダウンしたようです。

前後で比較すると、この部分しか差がありません。ダウン前。

  Key[key5]: Segment[49] Primary[my-hostname-32168]
  Key[key6]: Segment[18] Primary[my-hostname-32168]

ダウン後。

  Key[key5]: Segment[49] Primary[my-hostname-18288]
  Key[key6]: Segment[18] Primary[my-hostname-43450]

新しいメンバ(my-hostname-43450)ーに対しては、プライマリオーナーとなっているセグメントはひとつしかないので、ちょっとアンバランスですね。

続いて、SyncConsistentHashFactoryを使用した場合。

    it("syncConsistentHashCache") {
      println("========== Start syncConsistentCache ===========")

      withCache[String, String](4, "syncConsistentHashCache") { cache =>
        val range = 1 to 10

        range.foreach(i => cache.put(s"key$i", s"value$i"))

        val advancedCache = cache.getAdvancedCache
        val dm = advancedCache.getDistributionManager
        val consistentHash = dm.getConsistentHash

        consistentHash.getNumSegments should be (60)

        println(s"""|Cluster Members:
                    |${advancedCache
                        .getRpcManager
                        .getMembers
                        .asScala
                        .mkString("  ", System.lineSeparator + "  ", "")}""".stripMargin)

        println("===")

        println("Cluster Initial State[SyncConsistentHash]:")
        range.foreach(i => println(s"  Key[key$i]: Segment[${dm.getConsistentHash.getSegment(s"key$i")}] Primary[${dm.getPrimaryLocation(s"key$i")}]"))

        downMember()
        newMember(cache)

        // downMember()
        // newMember(cache)

        println("===")

        println(s"""|Cluster Members:
                    |${advancedCache
                        .getRpcManager
                        .getMembers
                        .asScala
                        .mkString("  ", System.lineSeparator + "  ", "")}""".stripMargin)

        println("===")

        println("Cluster Member Changed State[SyncConsistentHash]:")
        range.foreach(i => cache.put(s"key$i", s"value$i"))
        range.foreach(i => println(s"  Key[key$i]: Segment[${dm.getConsistentHash.getSegment(s"key$i")}] Primary[${dm.getPrimaryLocation(s"key$i")}]"))
      }        

      println("========== End syncConsistentCache ===========")
    }

初期状態。

Cluster Members:
  my-hostname-20014
  my-hostname-42566
  my-hostname-48745
  my-hostname-18569
===
Cluster Initial State[SyncConsistentHash]:
  Key[key1]: Segment[10] Primary[my-hostname-42566]
  Key[key2]: Segment[2] Primary[my-hostname-18569]
  Key[key3]: Segment[4] Primary[my-hostname-18569]
  Key[key4]: Segment[21] Primary[my-hostname-48745]
  Key[key5]: Segment[49] Primary[my-hostname-18569]
  Key[key6]: Segment[18] Primary[my-hostname-20014]
  Key[key7]: Segment[24] Primary[my-hostname-20014]
  Key[key8]: Segment[13] Primary[my-hostname-42566]
  Key[key9]: Segment[42] Primary[my-hostname-42566]
  Key[key10]: Segment[37] Primary[my-hostname-20014]

メンバー退出、追加後。

Cluster Members:
  my-hostname-20014
  my-hostname-42566
  my-hostname-18569
  my-hostname-25460
===
Cluster Member Changed State[SyncConsistentHash]:
  Key[key1]: Segment[10] Primary[my-hostname-42566]
  Key[key2]: Segment[2] Primary[my-hostname-25460]
  Key[key3]: Segment[4] Primary[my-hostname-25460]
  Key[key4]: Segment[21] Primary[my-hostname-18569]
  Key[key5]: Segment[49] Primary[my-hostname-18569]
  Key[key6]: Segment[18] Primary[my-hostname-42566]
  Key[key7]: Segment[24] Primary[my-hostname-20014]
  Key[key8]: Segment[13] Primary[my-hostname-25460]
  Key[key9]: Segment[42] Primary[my-hostname-42566]
  Key[key10]: Segment[37] Primary[my-hostname-25460]

今回は、3番目のNodeがダウンしたようです。

前後で比較すると、けっこうプライマリ・オーナーが移動しています。差分だけ、記述します。ダウン前。

  Key[key2]: Segment[2] Primary[my-hostname-18569]
  Key[key3]: Segment[4] Primary[my-hostname-18569]
  Key[key4]: Segment[21] Primary[my-hostname-48745]

  Key[key6]: Segment[18] Primary[my-hostname-20014]

  Key[key8]: Segment[13] Primary[my-hostname-42566]

  Key[key10]: Segment[37] Primary[my-hostname-20014]

ダウン後。

  Key[key2]: Segment[2] Primary[my-hostname-25460]
  Key[key3]: Segment[4] Primary[my-hostname-25460]
  Key[key4]: Segment[21] Primary[my-hostname-18569]

  Key[key6]: Segment[18] Primary[my-hostname-42566]

  Key[key8]: Segment[13] Primary[my-hostname-25460]

  Key[key10]: Segment[37] Primary[my-hostname-25460]

というわけで、SyncConsistentHashにすると、できる限り均等になるように分配してくれるようですが、その分セグメントの移動量も多くなるということですね。

勉強になりましたー。

今回作成したコードは、こちらにアップしています。

https://github.com/kazuhira-r/infinispan-examples/tree/master/infinispan-hash-algorithm