今度は、アプリケーションと同一のJavaVMでInfinispanを起動させ、なおかつアプリケーション間でのキャッシュの共有にトライしてみます。
いわゆるクラスタですが、モードには
- distribution(分散)
- replication(レプリケーション)
があるようです。
では、build.sbtから。
name := "infinispan-embedded-clustering" version := "0.0.1" scalaVersion := "2.10.0" organization := "littlewings" fork in run := true resolvers += "jboss repository" at "http://repository.jboss.org/nexus/content/groups/public-jboss/" libraryDependencies += "org.infinispan" % "infinispan-core" % "5.2.0.Final"
続いて、ソースコード。
src/main/scala/EmbeddedClusteredClient.scala
import org.infinispan.manager.DefaultCacheManager object EmbeddedClusteredClient { def main(args: Array[String]): Unit = { val (selfName, pairNames) = args.toList match { case s :: restNames => (s, restNames) case _ => sys.exit(1) } new EmbeddedClusteredClient(selfName, pairNames).run } } class EmbeddedClusteredClient(val selfName: String, val pairNames: List[String]) { def run(): Unit = { val manager = new DefaultCacheManager("infinispan.xml") val cache = manager.getCache[String, String]() cache.addListener(new LoggingListener) try { val range = 1 to 5 val keys = range.map(k => key(k.toString)) val values = range.map(v => s"value$v") val pairNamesKeys = pairNames.flatMap(pn => range.map(k => key(pn, k.toString))) (keys zip values) foreach { case (k, v) => cache.put(k, v) } println("Sleeping...") Thread.sleep(10000L) keys foreach (k => log(s"$k => ${cache.get(k)}")) pairNamesKeys foreach (k => log(s"$k => ${cache.get(k)}")) } finally { cache.stop() manager.stop() } } def key(seed: String): String = key(selfName, seed) def key(name: String, seed: String): String = s"${name}:${seed}" def log(message: String): Unit = println(s"[$selfName] received, $message") }
引数に、仮のノード名を取るようにしています。ひとつ目は自分自身で、それ以外は連携する相手のノード名を指定します。
今までの例と違って、
cache.stop() manager.stop()
みたいなコードが入っています。クラスタリングを使う場合は、これを入れて明示的に停止しないと、JavaVMが終了しません。クラスタリングに使っている、非Daemonスレッドが残り続けてしまうためです。
キャッシュに対するイベントをコンソールに出力するため、Listenerを付けています。
src/main/scala/LoggingListener.scala
import org.infinispan.notifications.Listener import org.infinispan.notifications.cachelistener.annotation.{CacheEntryCreated, CacheEntryRemoved} import org.infinispan.notifications.cachelistener.event.{CacheEntryCreatedEvent, CacheEntryRemovedEvent} @Listener class LoggingListener { @CacheEntryCreated def observeAdd(event: CacheEntryCreatedEvent[_, _]): Unit = if (!event.isPre) println(s"Cache entry with key [${event.getKey}] added in cache [${event.getCache}]") else () @CacheEntryRemoved def observeRemoved(event: CacheEntryRemovedEvent[_, _]): Unit = println(s"Cache entry with key [${event.getKey}] removed in cache [${event.getCache}]") }
設定ファイルは、2つ用意しました。
まずは本体の方を。
src/main/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:5.2 http://www.infinispan.org/schemas/infinispan-config-5.2.xsd" xmlns="urn:infinispan:config:5.2"> <global> <transport clusterName="demoCluster"> <properties> <property name="configurationFile" value="jgroups.xml" /> </properties> </transport> <globalJmxStatistics enabled="true"/> </global> <default> <jmxStatistics enabled="true"/> <clustering mode="distribution"> <!-- <clustering mode="replication"> --> <hash numOwners="2" /> <sync /> </clustering> </default> </infinispan>
クラスタの設定は、この部分で行っています。
<clustering mode="distribution"> <!-- <clustering mode="replication"> --> <hash numOwners="2" /> <sync /> </clustering>
設定は、たぶんこんな感じになっています。
- clusteringタグのmode属性 → 分散キャッシュかレプリケーションかを指定(今回は分散キャッシュ)
- hashタグのnumOwners属性 → データのコピーをいくつのノードに持つか(今回は2つ)
- syncタグ → キャッシュの同期転送
syncタグの意味は、これだと思います。
https://access.redhat.com/knowledge/docs/ja-JP/JBoss_Data_Grid/6/html-single/Administration_and_Configuration_Guide/index.html#Asynchronous_and_Synchronous_Operations
numOwnersの意味は、たぶんこちら。
7.4.4.5. 分散されたデータグリッドの設定
https://access.redhat.com/knowledge/docs/ja-JP/JBoss_Data_Grid/6/html-single/Getting_Started_Guide/index.html#sect-Configure_the_Cluster
続いて、src/main/resources/jgroups.xml
<?xml version="1.0" encoding="UTF-8"?> <config xmlns="urn:org:jgroups" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.2.xsd"> <UDP mcast_addr="${jgroups.udp.mcast_addr:228.6.7.8}" mcast_port="${jgroups.udp.mcast_port:46655}" tos="8" ucast_recv_buf_size="100000" ucast_send_buf_size="100000" mcast_recv_buf_size="100000" mcast_send_buf_size="100000" loopback="true" max_bundle_size="64000" max_bundle_timeout="30" ip_ttl="${jgroups.udp.ip_ttl:2}" enable_bundling="true" enable_unicast_bundling="true" enable_diagnostics="false" thread_naming_pattern="cl" thread_pool.enabled="true" thread_pool.min_threads="2" thread_pool.max_threads="4" thread_pool.keep_alive_time="5000" thread_pool.queue_enabled="true" thread_pool.queue_max_size="100" thread_pool.rejection_policy="discard" oob_thread_pool.enabled="true" oob_thread_pool.min_threads="2" oob_thread_pool.max_threads="4" oob_thread_pool.keep_alive_time="5000" oob_thread_pool.queue_enabled="false" oob_thread_pool.queue_max_size="100" oob_thread_pool.rejection_policy="discard" /> <PING timeout="3000" num_initial_members="2"/> <MERGE2 max_interval="30000" min_interval="10000"/> <FD_SOCK/> <FD_ALL/> <VERIFY_SUSPECT timeout="1500"/> <BARRIER /> <pbcast.NAKACK use_mcast_xmit="true" retransmit_timeout="300,600,1200" discard_delivered_msgs="true"/> <UNICAST2 /> <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000" max_bytes="1000000"/> <pbcast.GMS print_local_addr="true" join_timeout="3000" max_bundling_time="200" view_bundling="true"/> <UFC max_credits="500000" min_threshold="0.20"/> <MFC max_credits="500000" min_threshold="0.20"/> <FRAG2 frag_size="60000" /> </config>
設定の意味は、まだほとんどわかっていません…。このファイルがなくてもデフォルトの設定で起動するのですが、使っているOSのバッファサイズとかがJGroupsのデフォルト値にまったく合わなくて警告されまくるので、Infinispanに同梱されているサンプルと
https://github.com/infinispan/infinispan-quickstart/blob/master/clustered-cache/src/main/resources/jgroups.xml
を使って適当に設定しました…。
近く、ちゃんと調べます…。
では、動かしてみます。sbtコンソールを2つ起動して、それぞれ以下の様に実行します。
# コンソール1(node1) > run node1 node2 # コンソール2(node2) > run node2 node1
これで、コンソール1と2をパパッと実行します。
コンソール1(node1)
> run node1 node2 [info] Running EmbeddedClusteredClient node1 node2 [error] 2 09, 2013 8:53:32 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport start [error] INFO: ISPN000078: Starting JGroups Channel [info] [info] ------------------------------------------------------------------- [info] GMS: address=ubuntu-23062, cluster=demoCluster, physical address=fe80:0:0:0:20c:29ff:fe5c:cfec%2:56051 [info] ------------------------------------------------------------------- [error] 2 09, 2013 8:53:37 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport viewAccepted [error] INFO: ISPN000094: Received new cluster view: [ubuntu-23062|0] [ubuntu-23062] [error] 2 09, 2013 8:53:37 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport startJGroupsChannelIfNeeded [error] INFO: ISPN000079: Cache local address is ubuntu-23062, physical addresses are [fe80:0:0:0:20c:29ff:fe5c:cfec%2:56051] [error] 2 09, 2013 8:53:37 午後 org.infinispan.factories.GlobalComponentRegistry start [error] INFO: ISPN000128: Infinispan version: Infinispan 'Delirium' 5.2.0.Final [error] 2 09, 2013 8:53:37 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport viewAccepted [error] INFO: ISPN000094: Received new cluster view: [ubuntu-23062|1] [ubuntu-23062, ubuntu-14783] [error] 2 09, 2013 8:53:37 午後 org.infinispan.jmx.CacheJmxRegistration start [error] INFO: ISPN000031: MBeans were successfully registered to the platform MBean server. [info] Cache entry with key [node1:1] added in cache [Cache '___defaultcache'@ubuntu-23062] [info] Cache entry with key [node1:2] added in cache [Cache '___defaultcache'@ubuntu-23062] [info] Cache entry with key [node1:3] added in cache [Cache '___defaultcache'@ubuntu-23062] [info] Cache entry with key [node1:4] added in cache [Cache '___defaultcache'@ubuntu-23062] [info] Cache entry with key [node1:5] added in cache [Cache '___defaultcache'@ubuntu-23062] [info] Sleeping... [info] Cache entry with key [node2:1] added in cache [Cache '___defaultcache'@ubuntu-23062] [info] Cache entry with key [node2:2] added in cache [Cache '___defaultcache'@ubuntu-23062] [info] Cache entry with key [node2:3] added in cache [Cache '___defaultcache'@ubuntu-23062] [info] Cache entry with key [node2:4] added in cache [Cache '___defaultcache'@ubuntu-23062] [info] Cache entry with key [node2:5] added in cache [Cache '___defaultcache'@ubuntu-23062] [info] [node1] received, node1:1 => value1 [info] [node1] received, node1:2 => value2 [info] [node1] received, node1:3 => value3 [info] [node1] received, node1:4 => value4 [info] [node1] received, node1:5 => value5 [info] [node1] received, node2:1 => value1 [info] [node1] received, node2:2 => value2 [info] [node1] received, node2:3 => value3 [info] [node1] received, node2:4 => value4 [info] [node1] received, node2:5 => value5 [error] 2 09, 2013 8:53:48 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport stop [error] INFO: ISPN000080: Disconnecting and closing JGroups Channel [error] 2 09, 2013 8:53:48 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport stop [error] INFO: ISPN000082: Stopping the RpcDispatcher [success] Total time: 19 s, completed 2013/02/09 20:53:48
コンソール2(node2)
> run node2 node1 [info] Running EmbeddedClusteredClient node2 node1 [error] 2 09, 2013 8:53:33 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport start [error] INFO: ISPN000078: Starting JGroups Channel [info] [info] ------------------------------------------------------------------- [info] GMS: address=ubuntu-14783, cluster=demoCluster, physical address=fe80:0:0:0:20c:29ff:fe5c:cfec%2:59560 [info] ------------------------------------------------------------------- [error] 2 09, 2013 8:53:37 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport viewAccepted [error] INFO: ISPN000094: Received new cluster view: [ubuntu-23062|1] [ubuntu-23062, ubuntu-14783] [error] 2 09, 2013 8:53:38 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport startJGroupsChannelIfNeeded [error] INFO: ISPN000079: Cache local address is ubuntu-14783, physical addresses are [fe80:0:0:0:20c:29ff:fe5c:cfec%2:59560] [error] 2 09, 2013 8:53:38 午後 org.infinispan.factories.GlobalComponentRegistry start [error] INFO: ISPN000128: Infinispan version: Infinispan 'Delirium' 5.2.0.Final [error] 2 09, 2013 8:53:38 午後 org.infinispan.jmx.CacheJmxRegistration start [error] INFO: ISPN000031: MBeans were successfully registered to the platform MBean server. [info] Cache entry with key [node2:1] added in cache [Cache '___defaultcache'@ubuntu-14783] [info] Cache entry with key [node2:2] added in cache [Cache '___defaultcache'@ubuntu-14783] [info] Cache entry with key [node2:3] added in cache [Cache '___defaultcache'@ubuntu-14783] [info] Cache entry with key [node2:4] added in cache [Cache '___defaultcache'@ubuntu-14783] [info] Cache entry with key [node2:5] added in cache [Cache '___defaultcache'@ubuntu-14783] [info] Sleeping... [error] 2 09, 2013 8:53:48 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport viewAccepted [error] INFO: ISPN000094: Received new cluster view: [ubuntu-14783|2] [ubuntu-14783] [info] [node2] received, node2:1 => value1 [info] [node2] received, node2:2 => value2 [info] [node2] received, node2:3 => value3 [info] [node2] received, node2:4 => value4 [info] [node2] received, node2:5 => value5 [info] [node2] received, node1:1 => value1 [info] [node2] received, node1:2 => value2 [info] [node2] received, node1:3 => value3 [info] [node2] received, node1:4 => value4 [info] [node2] received, node1:5 => value5 [error] 2 09, 2013 8:53:48 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport stop [error] INFO: ISPN000080: Disconnecting and closing JGroups Channel [error] 2 09, 2013 8:53:48 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport stop [error] INFO: ISPN000082: Stopping the RpcDispatcher [success] Total time: 19 s, completed 2013/02/09 20:53:49
一応、両方のnodeで同じデータが見れています。
# node1 [info] [node1] received, node1:1 => value1 [info] [node1] received, node1:2 => value2 [info] [node1] received, node1:3 => value3 [info] [node1] received, node1:4 => value4 [info] [node1] received, node1:5 => value5 [info] [node1] received, node2:1 => value1 [info] [node1] received, node2:2 => value2 [info] [node1] received, node2:3 => value3 [info] [node1] received, node2:4 => value4 [info] [node1] received, node2:5 => value5 # node2 [info] [node2] received, node2:1 => value1 [info] [node2] received, node2:2 => value2 [info] [node2] received, node2:3 => value3 [info] [node2] received, node2:4 => value4 [info] [node2] received, node2:5 => value5 [info] [node2] received, node1:1 => value1 [info] [node2] received, node1:2 => value2 [info] [node2] received, node1:3 => value3 [info] [node2] received, node1:4 => value4 [info] [node2] received, node1:5 => value5
途中で出ている
[info] Cache entry with key [node1:1] added in cache [Cache '___defaultcache'@ubuntu-23062]
みたいなログは、仕込んだListenerが吐いているものです。node1の方では、node2で追加したエントリがログ出力されています。そういうのは出ていませんが…。
結果として、両nodeで同じデータが見えているので、一応共有できているということかな?まあ、numOwnersが2なので、両方とも同じデータを持っているんでしょうけど。
レプリケーションモードにしても、見た目の動きは変わらないので割愛。
ちなみに、設定はこのままで
> run node1 node2 node3 > run node2 node1 node3 > run node3 node1 node2
みたいに3ノード以上を使おうとするとどこかのnodeで
[error] WARN: ISPN000071: Caught exception when handling command CacheTopologyControlCommand{cache=___defaultcache, type=REBALANCE_CONFIRM, sender=ubuntu-4378, joinInfo=null, topologyId=6, currentCH=null, pendingCH=null, throwable=null, viewId=2} [error] org.infinispan.CacheException: Received invalid rebalance confirmation from ubuntu-4378 for cache ___defaultcache, we don't have a rebalance in progress ... [error] ERROR: Failed to start rebalance: null [error] java.lang.InterruptedException
とか、
[error] ERROR: Failed to start rebalance: org.infinispan.CacheException: Remote (ubuntu-18047) failed unexpectedly [error] java.util.concurrent.ExecutionException: org.infinispan.CacheException: Remote (ubuntu-18047) failed unexpectedly
みたいなエラーを見ます。
クラスタリングのモードをレプリケーションモードにしたり、numOwnersを3にすると出なくなるので、先に終了しちゃったnodeが持ってたデータに対する振り先がなくなるので、再バランシングできなくなったよってことかな?