CLOVER🍀

That was when it all began.

Infinispanでヘテロなクラスタを組んでみる

2014/4/22)
Issue ISPN-3752での修正後の確認結果に更新しました。
このエントリは、Infinispan 6.0.2.Finalで書き直しています。

Infinispan 6.0.0.Finalがリリースされました。けっこうリリースを楽しみにしていたのですが、その中でもちょっと気になっていたのがこちらの機能追加。

Heterogenous clusters with Infinispan 6.0.0.Beta1
http://blog.infinispan.org/2013/09/heterogenous-clusters-with-infinispan.html

Allow configuring the number of segments per node(ISPN-3051)
https://issues.jboss.org/browse/ISPN-3051

Distributionクラスタを組む時に、各Nodeが持つデータに偏りを持たせることができます。これは、capacityFactorという属性値で設定します。デフォルト値は、1.0でfloatで指定するようです。

例えば、あるNodeでは

<namedCache name="capacity_factors">
  <clustering mode="distribution">
    <hash capacityFactor="2.0"/>
  </clustering>
</namedCache>

と指定することで、デフォルトの1.0と指定されたNodeの2倍のデータを持ちます。また、あるNodeでは

<namedCache name="capacity_factors">
  <clustering mode="distribution">
    <hash capacityFactor="0.5"/>
  </clustering>
</namedCache>

と指定することでデフォルトのノードの半分となるそうな。

特に、特徴的なのが

<namedCache name="capacity_factors">
  <clustering mode="distribution">
    <hash capacityFactor="0"/>
  </clustering>
</namedCache>

と指定するパターンで、これはデータをまったく持たないことを意味します。

Oracle Coherenceでいう「tangosol.coherence.distributed.localstorage」をfalseに指定するようなもので、実際そのようなことが先のISPN-3051にも書かれています。

つまり、これまでEmbedded Cache(ライブラリモード)であれば、データが各Nodeにそれぞれ分配されてしまっていたのですが、capacityFactorを指定することで、以下のようなことが可能になります。

  • capacityFactorの値を調整することで、Node(というかマシン)のスペックに応じた、データ量の持つようにする
  • capacityFactorを0のNodeをクラスタに参加させることで、データを持たないクライアント/サーバ的な構成をとる

特にcapacityFactorが0のNodeは特徴的で、これを利用することで高機能なEmbedded Cacheとして使いつつ、Hot Rodのようなクライアント/サーバの構成を取れるというのはかなり面白いと思います。L1 Cacheも組み合わせると、さらに幅が広がりそうですね。

もちろん、Hot Rodとの使い分けはケースバイケースでしょうし、capacityFactorが0だからといって頻繁にクラスタに参加/離脱をしたりするのもよくないと思われるので、そのあたりはきっと注意することに。

というわけで、6.0.0.Finalが出たのに気付いてから、ちょっと試してみました。が、…リリース当時はこの機能にバグがあり、修正後再度確認しました。

今回は、capacityFactorが0のクライアントと、capacityFactorが1のサーバを構成する感じで作ってみたいと思います。

準備

まずは、build.sbt。アルファ版の頃から、sbtでの依存関係の解決が変な動きをしていると思っていましたが、やっぱりFinalでも発生しました。

結果、こんなbuild.sbtになりました。
*エントリの書き直しの結果、Scalaは2.11.0になりました

name := "infinispan-heterogeneous-cluster"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.0"

organization := "littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked")

fork in run := true

connectInput in run := true

// resolvers += "Public JBoss Group" at "http://repository.jboss.org/nexus/content/groups/public-jboss"

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-core" % "6.0.2.Final" excludeAll(
    ExclusionRule(organization = "org.jgroups", name = "jgroups"),
    ExclusionRule(organization = "org.jboss.marshalling", name = "jboss-marshalling-river"),
    ExclusionRule(organization = "org.jboss.marshalling", name = "jboss-marshalling"),
    ExclusionRule(organization = "org.jboss.logging", name = "jboss-logging"),
    ExclusionRule(organization = "org.jboss.spec.javax.transaction", name = "jboss-transaction-api_1.1_spec")
  ),
  "org.jgroups" % "jgroups" % "3.4.1.Final",
  "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.1_spec" % "1.0.1.Final",
  "org.jboss.marshalling" % "jboss-marshalling-river" % "1.4.4.Final",
  "org.jboss.marshalling" % "jboss-marshalling" % "1.4.4.Final",
  "org.jboss.logging" % "jboss-logging" % "3.1.2.GA",
  "net.jcip" % "jcip-annotations" % "1.0"
)

そう、なんとJBoss系のライブラリがほとんど解決できていらっしゃらない(笑)。infinispan-coreを指定すると、他に6つライブラリを取得しようとするんですけど、うち5つが解決できませんでした…。あと、jcip-annotationsはScalaでInfinispanを使用する場合は、明示的に指定する必要があります。

ちなみに、

  "org.infinispan" % "infinispan-core" % "6.0.2.Final"

だけにすると、こうなります。

[warn] 	::::::::::::::::::::::::::::::::::::::::::::::
[warn] 	::          UNRESOLVED DEPENDENCIES         ::
[warn] 	::::::::::::::::::::::::::::::::::::::::::::::
[warn] 	:: org.jboss.logging#jboss-logging;${version.jboss.logging}: not found
[warn] 	:: org.jgroups#jgroups;${version.jgroups}: not found
[warn] 	:: org.jboss.spec.javax.transaction#jboss-transaction-api_1.1_spec;${version.jta}: not found
[warn] 	:: org.jboss.marshalling#jboss-marshalling-river;${version.jboss.marshalling}: not found
[warn] 	:: org.jboss.marshalling#jboss-marshalling;${version.jboss.marshalling}: not found
[warn] 	::::::::::::::::::::::::::::::::::::::::::::::
[trace] Stack trace suppressed: run last *:update for the full output.
[error] (*:update) sbt.ResolveException: unresolved dependency: org.jboss.logging#jboss-logging;${version.jboss.logging}: not found
[error] unresolved dependency: org.jgroups#jgroups;${version.jgroups}: not found
[error] unresolved dependency: org.jboss.spec.javax.transaction#jboss-transaction-api_1.1_spec;${version.jta}: not found
[error] unresolved dependency: org.jboss.marshalling#jboss-marshalling-river;${version.jboss.marshalling}: not found
[error] unresolved dependency: org.jboss.marshalling#jboss-marshalling;${version.jboss.marshalling}: not found
[error] Total time: 4 s, completed 2013/11/22 1:09:17

続いて、設定ファイル。
src/main/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd"
    xmlns="urn:infinispan:config:6.0">

  <global>
    <transport clusterName="heterogeneous-cluster">
      <properties>
        <property name="configurationFile" value="jgroups.xml" />
      </properties>
    </transport>
    <globalJmxStatistics
        enabled="true"
        jmxDomain="org.infinispan"
        cacheManagerName="DefaultCacheManager"
        />

    <shutdown hookBehavior="REGISTER"/>
  </global>

  <default />

  <namedCache name="heterogeneous-cache">
    <clustering mode="distribution">
      <hash numOwners="1" capacityFactor="${capacity.factor:0}" />
    </clustering>
  </namedCache>
</infinispan>

capacityFactorは、「capacity.factor」というシステムプロパティで値を指定した場合はその値を使い、そうでない場合は0をデフォルト値とするようにしました。また、データの偏りがわかりやすいように、numOwnersは1としています。

続いて、JGroupsの設定。
src/main/resources/jgroups.xml

<?xml version="1.0" encoding="UTF-8"?>
<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.4.xsd">
  <UDP
      mcast_addr="${jgroups.udp.mcast_addr:228.6.7.8}"
      mcast_port="${jgroups.udp.mcast_port:45688}"
      tos="8"
      ucast_recv_buf_size="130000"
      ucast_send_buf_size="100000"
      mcast_recv_buf_size="130000"
      mcast_send_buf_size="100000"
      loopback="true"
      max_bundle_size="31k"
      ip_ttl="${jgroups.udp.ip_ttl:2}"
      enable_diagnostics="false"
      bundler_type="old"

      thread_naming_pattern=""

      thread_pool.enabled="true"
      thread_pool.min_threads="2"
      thread_pool.max_threads="8"
      thread_pool.keep_alive_time="60000"
      thread_pool.queue_enabled="true"
      thread_pool.queue_max_size="100"
      thread_pool.rejection_policy="Discard"

      oob_thread_pool.enabled="true"
      oob_thread_pool.min_threads="2"
      oob_thread_pool.max_threads="8"
      oob_thread_pool.keep_alive_time="60000"
      oob_thread_pool.queue_enabled="false"
      oob_thread_pool.queue_max_size="100"
      oob_thread_pool.rejection_policy="Discard"

      internal_thread_pool.enabled="true"
      internal_thread_pool.min_threads="1"
      internal_thread_pool.max_threads="8"
      internal_thread_pool.keep_alive_time="60000"
      internal_thread_pool.queue_enabled="true"
      internal_thread_pool.queue_max_size="100"
      internal_thread_pool.rejection_policy="Discard"
      />

  <PING timeout="3000" num_initial_members="2" />
  <MERGE2 max_interval="30000" min_interval="10000"/>

  <FD_SOCK />
  <FD_ALL timeout="15000" interval="3000" />
  <VERIFY_SUSPECT timeout="1500"/>

  <pbcast.NAKACK2
      xmit_interval="1000"
      xmit_table_num_rows="100"
      xmit_table_msgs_per_row="10000"
      xmit_table_max_compaction_time="10000"
      max_msg_batch_size="100"/>

  <UNICAST3
      xmit_interval="500"
      xmit_table_num_rows="20"
      xmit_table_msgs_per_row="10000"
      xmit_table_max_compaction_time="10000"
      max_msg_batch_size="100"
      conn_expiry_timeout="0"/>

  <pbcast.STABLE stability_delay="500" desired_avg_gossip="5000" max_bytes="1m"/>
  <pbcast.GMS print_local_addr="true" join_timeout="3000" view_bundling="true"/>
  <!--  <tom.TOA/> --> <!-- the TOA is only needed for total order transactions-->

  <UFC max_credits="2m" min_threshold="0.40"/>
  <MFC max_credits="2m" min_threshold="0.40"/>
  <FRAG2 frag_size="30k"  />
  <RSVP timeout="60000" resend_interval="500" ack_on_delivery="false" />
</config>

Infinispanが依存しているJGroupsが3.4.1.Finalに上がったことと、現在のInfinispanのデフォルト設定を見ながら小さめの値で作ってみました。

サーバ

サーバ側のプログラム。
src/main/scala/NamedCacheStoreServer.scala

import org.infinispan.manager.DefaultCacheManager

import scala.io.StdIn

object NamedCacheStoreServer {
  def main(args: Array[String]): Unit = {
    System.setProperty("capacity.factor", "1.0")

    val manager = new DefaultCacheManager("infinispan.xml")

    try {
      val cache = manager.getCache[String, String]("heterogeneous-cache")

      StdIn.readLine("NamedCacheStoreManager Startup.")

      cache.stop()
    } finally {
      manager.stop()
    }
  }
}

単純に、浮いていてもらうようのプログラムです。最初にシステムプロパティを設定していて、capacityFactorが1.0になるようにしています。

そして、クライアント側。こちらは、capacityFactorが設定ファイルのデフォルト値である0となります。
src/main/scala/NamedCacheClient.scala

import scala.collection.JavaConverters._

import scala.io.StdIn

import org.infinispan.distribution.ch.DefaultConsistentHash
import org.infinispan.manager.DefaultCacheManager

object NamedCacheClient {
  def main(args: Array[String]): Unit = {
    val manager = new DefaultCacheManager("infinispan.xml")

    try {
      val cache = manager.getCache[String, String]("heterogeneous-cache")
      val advancedCache = cache.getAdvancedCache
      val dm = advancedCache.getDistributionManager
      val rpc = advancedCache.getRpcManager

      val (keys, values) =
        (1 to 10).map { i => (s"key$i", s"value$i") }.unzip

      Iterator
        .continually(StdIn.readLine())
        .takeWhile(_ != "exit")
        .withFilter(l => l != null & !l.isEmpty)
        .foreach { command =>
          command.split("\\s").toList match {
            case "put-all" :: Nil =>
              // データ登録
              println("  [Put Cache Value]")
              keys.zip(values).foreach { case (k, v) =>
                println(s"  Key:$k => $v")
                cache.put(k, v)
              }
            case "get" :: key :: Nil =>
              // 特定のキーの値を取得
              println("  [Get]")
              println(s"  Key:$key => ${cache.get(key)}")
            case "get-all" :: Nil =>
              // すべてのキーの値を取得
              println("  [Get All]")
              keys foreach { k =>
                println(s"  Key:$k => ${cache.get(k)}")
              }
            case "clear" :: Nil =>
              // キャッシュクリア
              println("  [Clear]")
              cache.clear()
            case "members" :: Nil =>
              // クラスタのメンバー一覧を表示
              println("  [Cluster Members]")
              rpc.getMembers.asScala.foreach(m => println(s"  $m"))
            case "self" :: Nil =>
              // 自分自身を表示
              println("  [Self]")
              println(s"  ${manager.getAddress}")
            case "locate" :: Nil =>
              println("  [Entry Locate]")
              keys.foreach { key =>
                println(s"  Key:$key, PrimaryLocation => ${dm.getPrimaryLocation(key)} , Locate => ${dm.locate(key)}")
              }
            case "capacity-factors" :: Nil =>
              val ch =
                advancedCache
                  .getDistributionManager
                  .getConsistentHash
                  .asInstanceOf[org.infinispan.distribution.ch.DefaultConsistentHash]
              println("  [Capacity Factors]")
              println(s"  ${ch.getCapacityFactors}")
            case unknown => println(s"Unkwon Command[$unknown]")
          }
        }

      println("Exit NamedCacheClient.")

      cache.stop()
    } finally {
      manager.stop()
    }
  }
}

簡単なインタラクティブなコマンドを実行できるようにしてあり、クラスタのメンバー一覧やcapacityFactorの状況、キーの分散状況などが見れます。

動かしてみる

とりあえず、実行してみますが

> runMain NamedCacheClient

capacityFactorが0のNodeしかいないクラスタでは、起動に失敗します。

[error] java.lang.IllegalArgumentException: There must be at least one node with a non-zero capacity factor
[error] 	at org.infinispan.distribution.ch.DefaultConsistentHashFactory.checkCapacityFactors(DefaultConsistentHashFactory.java:56)

なので、先にサーバを起動します。

$ sbt "runMain NamedCacheStoreServer"

再度、クライアントを実行。

> runMain NamedCacheClient

今度は、クラスタに入れます。

クラスタのメンバー一覧を見てみます。

members
[info]   [Cluster Members]
[info]   xxxxx-15390
[info]   xxxxx-29294

ちなみに、クライアント自身は

self
[info]   [Self]
[info]   xxxxx-29294

という名前になっています。

ここで、capacityFactorを見てみます。

capacity-factors
[info]   [Capacity Factors]
[info]   {xxxxx-15390=1.0, xxxxx-29294=0.0}

クライアント側が0.0、サーバ側が1.0となっていることが確認できます。
*DefaultConsistentHash#getCapacityFactorsで確認しています

では、データを登録してみましょう。

put-all
[info]   [Put Cache Value]
[info]   Key:key1 => value1
[info]   Key:key2 => value2
[info]   Key:key3 => value3
[info]   Key:key4 => value4
[info]   Key:key5 => value5
[info]   Key:key6 => value6
[info]   Key:key7 => value7
[info]   Key:key8 => value8
[info]   Key:key9 => value9
[info]   Key:key10 => value10

キーの分散状況を見ると

locate
[info]   [Entry Locate]
[info]   Key:key1, PrimaryLocation => xxxxx-15390 , Locate => [xxxxx-15390]
[info]   Key:key2, PrimaryLocation => xxxxx-15390 , Locate => [xxxxx-15390]
[info]   Key:key3, PrimaryLocation => xxxxx-15390 , Locate => [xxxxx-15390]
[info]   Key:key4, PrimaryLocation => xxxxx-15390 , Locate => [xxxxx-15390]
[info]   Key:key5, PrimaryLocation => xxxxx-15390 , Locate => [xxxxx-15390]
[info]   Key:key6, PrimaryLocation => xxxxx-15390 , Locate => [xxxxx-15390]
[info]   Key:key7, PrimaryLocation => xxxxx-15390 , Locate => [xxxxx-15390]
[info]   Key:key8, PrimaryLocation => xxxxx-15390 , Locate => [xxxxx-15390]
[info]   Key:key9, PrimaryLocation => xxxxx-15390 , Locate => [xxxxx-15390]
[info]   Key:key10, PrimaryLocation => xxxxx-15390 , Locate => [xxxxx-15390]

サーバ側にすべてのデータが割り当てられ、クライアント側はデータを保持していません。

なので、1度クライアントを終了して

exit
[info] Exit NamedCacheClient.

もう1度クライアントを実行して

> runMain NamedCacheClient

データを見てみると、すべてのデータが残っていることが確認できます。

get-all
[info]   [Get All]
[info]   Key:key1 => value1
[info]   Key:key2 => value2
[info]   Key:key3 => value3
[info]   Key:key4 => value4
[info]   Key:key5 => value5
[info]   Key:key6 => value6
[info]   Key:key7 => value7
[info]   Key:key8 => value8
[info]   Key:key9 => value9
[info]   Key:key10 => value10

numOwnersは1なので、データはロストしていないということです。

というわけで

capacityFactorを利用することで、Node間でデータの保持状況に偏りを持たせられることを確認できました。

個人的にはcapacityFactorを0にするケースについても活用しどころがあって、非常に興味深いところです。

今回作成したコードは、こちらにアップしています。

https://github.com/kazuhira-r/infinispan-examples/tree/master/infinispan-heterogeneous-cluster

バージョンでの、苦労の跡と一緒に…。

過去の履歴)
実は、capacityFactorの機能が最初に追加されたInfinispan 6.0.0.Finalには、capacityFactorが効かないというバグがありました。

このことをここで言及していると、@nekopさんに見つけていただいてIssueを切ってもらえました。ありがとうございます。
ISPN-3752
https://issues.jboss.org/browse/ISPN-3752

で、すでに解決済みなのですが、せっかくなので過去の履歴として試行錯誤の結果も残しておきます。

Infinispan 6.0.0.Finalの頃は、どうも最初に起動したcapacityFactorが0以外のNodeの値に、他のNodeのcapacityFactorが引っ張られているような動きをしていました。

例えば、最初に起動するNodeのcapacityFactorを2にすると、そこから起動するNodeのcapacityFactorがみんな2になるように見えるわけです。

最初は諦めようかなぁとも思いましたが、この機能はなんとか使ってみたくてデバッグ

そして、コードを取得。

$ git clone https://github.com/infinispan/infinispan.git

で、なんか怪しそうな箇所がひとつ見つけまして…。

Infinispanのソースより…
infinispan/core/src/main/java/org/infinispan/topology/ClusterCacheStatus.java

   public boolean addMember(Address joiner, float capacityFactor) {
      synchronized (this) {
         if (members.contains(joiner)) {
            if (trace) log.tracef("Trying to add node %s to cache %s, but it is already a member: " +
                  "members = %s, joiners = %s", joiner, cacheName, members, joiners);
            return false;
         }

         HashMap<Address, Float> newCapacityFactors = new HashMap<Address, Float>(capacityFactors);
         newCapacityFactors.put(joiner, joinInfo.getCapacityFactor());
         capacityFactors = Immutables.immutableMapWrap(newCapacityFactors);
         members = immutableAdd(members, joiner);
         joiners = immutableAdd(joiners, joiner);
         if (trace) log.tracef("Added joiner %s to cache %s: members = %s, joiners = %s", joiner, cacheName,
               members, joiners);
         return true;
      }
   }

このaddMemberメソッドが、クラスタのメンバー参加時に呼ばれているようなのですが、引数のcapacityFactor使ってないのでは?

というわけで、こうひっくり返してみました。

         //newCapacityFactors.put(joiner, joinInfo.getCapacityFactor());
         newCapacityFactors.put(joiner, capacityFactor);

で、ビルド。

$ cd infinispan/core
$ mvn package -Dmaven.test.skip=true

targetディレクトリに、infinispan-core.jarができます。

$ ll target/infinispan-core.jar  
-rw-rw-r-- 1 xxxxx xxxxx 2025150 1121 23:19 target/infinispan-core.jar

できたJARファイルを、先ほどのプロジェクトにlibディレクトリを作成して放り込みます。

$ mkdir lib
$ cp /path/to/infinispan-core.jar lib

build.sbtの依存関係を変更して
*この時は、Infinispan 6.0.0.Final

  "org.infinispan" % "infinispan-commons" % "6.0.0.Final" excludeAll(
  //"org.infinispan" % "infinispan-core" % "6.0.0.Final" excludeAll(

サーバ側から再実行。

$ sbt "runMain NamedCacheStoreServer"

そして、クライアントも実行。

> runMain NamedCacheClient

…ここから先は、最初に書いたようにクライアントを終了しても、データがひとつも欠落しないことを確認できています。

このデバッグには、Bytemanを活用しました。

いろいろ苦労しましたけど、よい経験になりました〜。