2014/4/22)
Issue ISPN-3752での修正後の確認結果に更新しました。
このエントリは、Infinispan 6.0.2.Finalで書き直しています。
Infinispan 6.0.0.Finalがリリースされました。けっこうリリースを楽しみにしていたのですが、その中でもちょっと気になっていたのがこちらの機能追加。
Heterogenous clusters with Infinispan 6.0.0.Beta1
http://blog.infinispan.org/2013/09/heterogenous-clusters-with-infinispan.html
Allow configuring the number of segments per node(ISPN-3051)
https://issues.jboss.org/browse/ISPN-3051
Distributionクラスタを組む時に、各Nodeが持つデータに偏りを持たせることができます。これは、capacityFactorという属性値で設定します。デフォルト値は、1.0でfloatで指定するようです。
例えば、あるNodeでは
<namedCache name="capacity_factors"> <clustering mode="distribution"> <hash capacityFactor="2.0"/> </clustering> </namedCache>
と指定することで、デフォルトの1.0と指定されたNodeの2倍のデータを持ちます。また、あるNodeでは
<namedCache name="capacity_factors"> <clustering mode="distribution"> <hash capacityFactor="0.5"/> </clustering> </namedCache>
と指定することでデフォルトのノードの半分となるそうな。
特に、特徴的なのが
<namedCache name="capacity_factors"> <clustering mode="distribution"> <hash capacityFactor="0"/> </clustering> </namedCache>
と指定するパターンで、これはデータをまったく持たないことを意味します。
Oracle Coherenceでいう「tangosol.coherence.distributed.localstorage」をfalseに指定するようなもので、実際そのようなことが先のISPN-3051にも書かれています。
つまり、これまでEmbedded Cache(ライブラリモード)であれば、データが各Nodeにそれぞれ分配されてしまっていたのですが、capacityFactorを指定することで、以下のようなことが可能になります。
- capacityFactorの値を調整することで、Node(というかマシン)のスペックに応じた、データ量の持つようにする
- capacityFactorを0のNodeをクラスタに参加させることで、データを持たないクライアント/サーバ的な構成をとる
特にcapacityFactorが0のNodeは特徴的で、これを利用することで高機能なEmbedded Cacheとして使いつつ、Hot Rodのようなクライアント/サーバの構成を取れるというのはかなり面白いと思います。L1 Cacheも組み合わせると、さらに幅が広がりそうですね。
もちろん、Hot Rodとの使い分けはケースバイケースでしょうし、capacityFactorが0だからといって頻繁にクラスタに参加/離脱をしたりするのもよくないと思われるので、そのあたりはきっと注意することに。
というわけで、6.0.0.Finalが出たのに気付いてから、ちょっと試してみました。が、…リリース当時はこの機能にバグがあり、修正後再度確認しました。
今回は、capacityFactorが0のクライアントと、capacityFactorが1のサーバを構成する感じで作ってみたいと思います。
準備
まずは、build.sbt。アルファ版の頃から、sbtでの依存関係の解決が変な動きをしていると思っていましたが、やっぱりFinalでも発生しました。
結果、こんなbuild.sbtになりました。
*エントリの書き直しの結果、Scalaは2.11.0になりました
name := "infinispan-heterogeneous-cluster" version := "0.0.1-SNAPSHOT" scalaVersion := "2.11.0" organization := "littlewings" scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked") fork in run := true connectInput in run := true // resolvers += "Public JBoss Group" at "http://repository.jboss.org/nexus/content/groups/public-jboss" libraryDependencies ++= Seq( "org.infinispan" % "infinispan-core" % "6.0.2.Final" excludeAll( ExclusionRule(organization = "org.jgroups", name = "jgroups"), ExclusionRule(organization = "org.jboss.marshalling", name = "jboss-marshalling-river"), ExclusionRule(organization = "org.jboss.marshalling", name = "jboss-marshalling"), ExclusionRule(organization = "org.jboss.logging", name = "jboss-logging"), ExclusionRule(organization = "org.jboss.spec.javax.transaction", name = "jboss-transaction-api_1.1_spec") ), "org.jgroups" % "jgroups" % "3.4.1.Final", "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.1_spec" % "1.0.1.Final", "org.jboss.marshalling" % "jboss-marshalling-river" % "1.4.4.Final", "org.jboss.marshalling" % "jboss-marshalling" % "1.4.4.Final", "org.jboss.logging" % "jboss-logging" % "3.1.2.GA", "net.jcip" % "jcip-annotations" % "1.0" )
そう、なんとJBoss系のライブラリがほとんど解決できていらっしゃらない(笑)。infinispan-coreを指定すると、他に6つライブラリを取得しようとするんですけど、うち5つが解決できませんでした…。あと、jcip-annotationsはScalaでInfinispanを使用する場合は、明示的に指定する必要があります。
ちなみに、
"org.infinispan" % "infinispan-core" % "6.0.2.Final"
だけにすると、こうなります。
[warn] :::::::::::::::::::::::::::::::::::::::::::::: [warn] :: UNRESOLVED DEPENDENCIES :: [warn] :::::::::::::::::::::::::::::::::::::::::::::: [warn] :: org.jboss.logging#jboss-logging;${version.jboss.logging}: not found [warn] :: org.jgroups#jgroups;${version.jgroups}: not found [warn] :: org.jboss.spec.javax.transaction#jboss-transaction-api_1.1_spec;${version.jta}: not found [warn] :: org.jboss.marshalling#jboss-marshalling-river;${version.jboss.marshalling}: not found [warn] :: org.jboss.marshalling#jboss-marshalling;${version.jboss.marshalling}: not found [warn] :::::::::::::::::::::::::::::::::::::::::::::: [trace] Stack trace suppressed: run last *:update for the full output. [error] (*:update) sbt.ResolveException: unresolved dependency: org.jboss.logging#jboss-logging;${version.jboss.logging}: not found [error] unresolved dependency: org.jgroups#jgroups;${version.jgroups}: not found [error] unresolved dependency: org.jboss.spec.javax.transaction#jboss-transaction-api_1.1_spec;${version.jta}: not found [error] unresolved dependency: org.jboss.marshalling#jboss-marshalling-river;${version.jboss.marshalling}: not found [error] unresolved dependency: org.jboss.marshalling#jboss-marshalling;${version.jboss.marshalling}: not found [error] Total time: 4 s, completed 2013/11/22 1:09:17
続いて、設定ファイル。
src/main/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd" xmlns="urn:infinispan:config:6.0"> <global> <transport clusterName="heterogeneous-cluster"> <properties> <property name="configurationFile" value="jgroups.xml" /> </properties> </transport> <globalJmxStatistics enabled="true" jmxDomain="org.infinispan" cacheManagerName="DefaultCacheManager" /> <shutdown hookBehavior="REGISTER"/> </global> <default /> <namedCache name="heterogeneous-cache"> <clustering mode="distribution"> <hash numOwners="1" capacityFactor="${capacity.factor:0}" /> </clustering> </namedCache> </infinispan>
capacityFactorは、「capacity.factor」というシステムプロパティで値を指定した場合はその値を使い、そうでない場合は0をデフォルト値とするようにしました。また、データの偏りがわかりやすいように、numOwnersは1としています。
続いて、JGroupsの設定。
src/main/resources/jgroups.xml
<?xml version="1.0" encoding="UTF-8"?> <config xmlns="urn:org:jgroups" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.4.xsd"> <UDP mcast_addr="${jgroups.udp.mcast_addr:228.6.7.8}" mcast_port="${jgroups.udp.mcast_port:45688}" tos="8" ucast_recv_buf_size="130000" ucast_send_buf_size="100000" mcast_recv_buf_size="130000" mcast_send_buf_size="100000" loopback="true" max_bundle_size="31k" ip_ttl="${jgroups.udp.ip_ttl:2}" enable_diagnostics="false" bundler_type="old" thread_naming_pattern="" thread_pool.enabled="true" thread_pool.min_threads="2" thread_pool.max_threads="8" thread_pool.keep_alive_time="60000" thread_pool.queue_enabled="true" thread_pool.queue_max_size="100" thread_pool.rejection_policy="Discard" oob_thread_pool.enabled="true" oob_thread_pool.min_threads="2" oob_thread_pool.max_threads="8" oob_thread_pool.keep_alive_time="60000" oob_thread_pool.queue_enabled="false" oob_thread_pool.queue_max_size="100" oob_thread_pool.rejection_policy="Discard" internal_thread_pool.enabled="true" internal_thread_pool.min_threads="1" internal_thread_pool.max_threads="8" internal_thread_pool.keep_alive_time="60000" internal_thread_pool.queue_enabled="true" internal_thread_pool.queue_max_size="100" internal_thread_pool.rejection_policy="Discard" /> <PING timeout="3000" num_initial_members="2" /> <MERGE2 max_interval="30000" min_interval="10000"/> <FD_SOCK /> <FD_ALL timeout="15000" interval="3000" /> <VERIFY_SUSPECT timeout="1500"/> <pbcast.NAKACK2 xmit_interval="1000" xmit_table_num_rows="100" xmit_table_msgs_per_row="10000" xmit_table_max_compaction_time="10000" max_msg_batch_size="100"/> <UNICAST3 xmit_interval="500" xmit_table_num_rows="20" xmit_table_msgs_per_row="10000" xmit_table_max_compaction_time="10000" max_msg_batch_size="100" conn_expiry_timeout="0"/> <pbcast.STABLE stability_delay="500" desired_avg_gossip="5000" max_bytes="1m"/> <pbcast.GMS print_local_addr="true" join_timeout="3000" view_bundling="true"/> <!-- <tom.TOA/> --> <!-- the TOA is only needed for total order transactions--> <UFC max_credits="2m" min_threshold="0.40"/> <MFC max_credits="2m" min_threshold="0.40"/> <FRAG2 frag_size="30k" /> <RSVP timeout="60000" resend_interval="500" ack_on_delivery="false" /> </config>
Infinispanが依存しているJGroupsが3.4.1.Finalに上がったことと、現在のInfinispanのデフォルト設定を見ながら小さめの値で作ってみました。
サーバ
サーバ側のプログラム。
src/main/scala/NamedCacheStoreServer.scala
import org.infinispan.manager.DefaultCacheManager import scala.io.StdIn object NamedCacheStoreServer { def main(args: Array[String]): Unit = { System.setProperty("capacity.factor", "1.0") val manager = new DefaultCacheManager("infinispan.xml") try { val cache = manager.getCache[String, String]("heterogeneous-cache") StdIn.readLine("NamedCacheStoreManager Startup.") cache.stop() } finally { manager.stop() } } }
単純に、浮いていてもらうようのプログラムです。最初にシステムプロパティを設定していて、capacityFactorが1.0になるようにしています。
そして、クライアント側。こちらは、capacityFactorが設定ファイルのデフォルト値である0となります。
src/main/scala/NamedCacheClient.scala
import scala.collection.JavaConverters._ import scala.io.StdIn import org.infinispan.distribution.ch.DefaultConsistentHash import org.infinispan.manager.DefaultCacheManager object NamedCacheClient { def main(args: Array[String]): Unit = { val manager = new DefaultCacheManager("infinispan.xml") try { val cache = manager.getCache[String, String]("heterogeneous-cache") val advancedCache = cache.getAdvancedCache val dm = advancedCache.getDistributionManager val rpc = advancedCache.getRpcManager val (keys, values) = (1 to 10).map { i => (s"key$i", s"value$i") }.unzip Iterator .continually(StdIn.readLine()) .takeWhile(_ != "exit") .withFilter(l => l != null & !l.isEmpty) .foreach { command => command.split("\\s").toList match { case "put-all" :: Nil => // データ登録 println(" [Put Cache Value]") keys.zip(values).foreach { case (k, v) => println(s" Key:$k => $v") cache.put(k, v) } case "get" :: key :: Nil => // 特定のキーの値を取得 println(" [Get]") println(s" Key:$key => ${cache.get(key)}") case "get-all" :: Nil => // すべてのキーの値を取得 println(" [Get All]") keys foreach { k => println(s" Key:$k => ${cache.get(k)}") } case "clear" :: Nil => // キャッシュクリア println(" [Clear]") cache.clear() case "members" :: Nil => // クラスタのメンバー一覧を表示 println(" [Cluster Members]") rpc.getMembers.asScala.foreach(m => println(s" $m")) case "self" :: Nil => // 自分自身を表示 println(" [Self]") println(s" ${manager.getAddress}") case "locate" :: Nil => println(" [Entry Locate]") keys.foreach { key => println(s" Key:$key, PrimaryLocation => ${dm.getPrimaryLocation(key)} , Locate => ${dm.locate(key)}") } case "capacity-factors" :: Nil => val ch = advancedCache .getDistributionManager .getConsistentHash .asInstanceOf[org.infinispan.distribution.ch.DefaultConsistentHash] println(" [Capacity Factors]") println(s" ${ch.getCapacityFactors}") case unknown => println(s"Unkwon Command[$unknown]") } } println("Exit NamedCacheClient.") cache.stop() } finally { manager.stop() } } }
簡単なインタラクティブなコマンドを実行できるようにしてあり、クラスタのメンバー一覧やcapacityFactorの状況、キーの分散状況などが見れます。
動かしてみる
とりあえず、実行してみますが
> runMain NamedCacheClient
capacityFactorが0のNodeしかいないクラスタでは、起動に失敗します。
[error] java.lang.IllegalArgumentException: There must be at least one node with a non-zero capacity factor [error] at org.infinispan.distribution.ch.DefaultConsistentHashFactory.checkCapacityFactors(DefaultConsistentHashFactory.java:56)
なので、先にサーバを起動します。
$ sbt "runMain NamedCacheStoreServer"
再度、クライアントを実行。
> runMain NamedCacheClient
今度は、クラスタに入れます。
members [info] [Cluster Members] [info] xxxxx-15390 [info] xxxxx-29294
ちなみに、クライアント自身は
self [info] [Self] [info] xxxxx-29294
という名前になっています。
ここで、capacityFactorを見てみます。
capacity-factors [info] [Capacity Factors] [info] {xxxxx-15390=1.0, xxxxx-29294=0.0}
クライアント側が0.0、サーバ側が1.0となっていることが確認できます。
*DefaultConsistentHash#getCapacityFactorsで確認しています
では、データを登録してみましょう。
put-all [info] [Put Cache Value] [info] Key:key1 => value1 [info] Key:key2 => value2 [info] Key:key3 => value3 [info] Key:key4 => value4 [info] Key:key5 => value5 [info] Key:key6 => value6 [info] Key:key7 => value7 [info] Key:key8 => value8 [info] Key:key9 => value9 [info] Key:key10 => value10
キーの分散状況を見ると
locate [info] [Entry Locate] [info] Key:key1, PrimaryLocation => xxxxx-15390 , Locate => [xxxxx-15390] [info] Key:key2, PrimaryLocation => xxxxx-15390 , Locate => [xxxxx-15390] [info] Key:key3, PrimaryLocation => xxxxx-15390 , Locate => [xxxxx-15390] [info] Key:key4, PrimaryLocation => xxxxx-15390 , Locate => [xxxxx-15390] [info] Key:key5, PrimaryLocation => xxxxx-15390 , Locate => [xxxxx-15390] [info] Key:key6, PrimaryLocation => xxxxx-15390 , Locate => [xxxxx-15390] [info] Key:key7, PrimaryLocation => xxxxx-15390 , Locate => [xxxxx-15390] [info] Key:key8, PrimaryLocation => xxxxx-15390 , Locate => [xxxxx-15390] [info] Key:key9, PrimaryLocation => xxxxx-15390 , Locate => [xxxxx-15390] [info] Key:key10, PrimaryLocation => xxxxx-15390 , Locate => [xxxxx-15390]
サーバ側にすべてのデータが割り当てられ、クライアント側はデータを保持していません。
なので、1度クライアントを終了して
exit [info] Exit NamedCacheClient.
もう1度クライアントを実行して
> runMain NamedCacheClient
データを見てみると、すべてのデータが残っていることが確認できます。
get-all [info] [Get All] [info] Key:key1 => value1 [info] Key:key2 => value2 [info] Key:key3 => value3 [info] Key:key4 => value4 [info] Key:key5 => value5 [info] Key:key6 => value6 [info] Key:key7 => value7 [info] Key:key8 => value8 [info] Key:key9 => value9 [info] Key:key10 => value10
numOwnersは1なので、データはロストしていないということです。
というわけで
capacityFactorを利用することで、Node間でデータの保持状況に偏りを持たせられることを確認できました。
個人的にはcapacityFactorを0にするケースについても活用しどころがあって、非常に興味深いところです。
今回作成したコードは、こちらにアップしています。
https://github.com/kazuhira-r/infinispan-examples/tree/master/infinispan-heterogeneous-cluster
旧バージョンでの、苦労の跡と一緒に…。
過去の履歴)
実は、capacityFactorの機能が最初に追加されたInfinispan 6.0.0.Finalには、capacityFactorが効かないというバグがありました。
このことをここで言及していると、@nekopさんに見つけていただいてIssueを切ってもらえました。ありがとうございます。
ISPN-3752
https://issues.jboss.org/browse/ISPN-3752
で、すでに解決済みなのですが、せっかくなので過去の履歴として試行錯誤の結果も残しておきます。
Infinispan 6.0.0.Finalの頃は、どうも最初に起動したcapacityFactorが0以外のNodeの値に、他のNodeのcapacityFactorが引っ張られているような動きをしていました。
例えば、最初に起動するNodeのcapacityFactorを2にすると、そこから起動するNodeのcapacityFactorがみんな2になるように見えるわけです。
最初は諦めようかなぁとも思いましたが、この機能はなんとか使ってみたくてデバッグ。
そして、コードを取得。
$ git clone https://github.com/infinispan/infinispan.git
で、なんか怪しそうな箇所がひとつ見つけまして…。
Infinispanのソースより…
infinispan/core/src/main/java/org/infinispan/topology/ClusterCacheStatus.java
public boolean addMember(Address joiner, float capacityFactor) { synchronized (this) { if (members.contains(joiner)) { if (trace) log.tracef("Trying to add node %s to cache %s, but it is already a member: " + "members = %s, joiners = %s", joiner, cacheName, members, joiners); return false; } HashMap<Address, Float> newCapacityFactors = new HashMap<Address, Float>(capacityFactors); newCapacityFactors.put(joiner, joinInfo.getCapacityFactor()); capacityFactors = Immutables.immutableMapWrap(newCapacityFactors); members = immutableAdd(members, joiner); joiners = immutableAdd(joiners, joiner); if (trace) log.tracef("Added joiner %s to cache %s: members = %s, joiners = %s", joiner, cacheName, members, joiners); return true; } }
このaddMemberメソッドが、クラスタのメンバー参加時に呼ばれているようなのですが、引数のcapacityFactor使ってないのでは?
というわけで、こうひっくり返してみました。
//newCapacityFactors.put(joiner, joinInfo.getCapacityFactor());
newCapacityFactors.put(joiner, capacityFactor);
で、ビルド。
$ cd infinispan/core $ mvn package -Dmaven.test.skip=true
targetディレクトリに、infinispan-core.jarができます。
$ ll target/infinispan-core.jar -rw-rw-r-- 1 xxxxx xxxxx 2025150 11月 21 23:19 target/infinispan-core.jar
できたJARファイルを、先ほどのプロジェクトにlibディレクトリを作成して放り込みます。
$ mkdir lib
$ cp /path/to/infinispan-core.jar lib
build.sbtの依存関係を変更して
*この時は、Infinispan 6.0.0.Final
"org.infinispan" % "infinispan-commons" % "6.0.0.Final" excludeAll( //"org.infinispan" % "infinispan-core" % "6.0.0.Final" excludeAll(
サーバ側から再実行。
$ sbt "runMain NamedCacheStoreServer"
そして、クライアントも実行。
> runMain NamedCacheClient
…ここから先は、最初に書いたようにクライアントを終了しても、データがひとつも欠落しないことを確認できています。
このデバッグには、Bytemanを活用しました。
いろいろ苦労しましたけど、よい経験になりました〜。