CLOVER🍀

That was when it all began.

Infinispan Embedded Cache+Cluster Cache

今度は、アプリケーションと同一のJavaVMでInfinispanを起動させ、なおかつアプリケーション間でのキャッシュの共有にトライしてみます。

いわゆるクラスタですが、モードには

があるようです。

では、build.sbtから。

name := "infinispan-embedded-clustering"

version := "0.0.1"

scalaVersion := "2.10.0"

organization := "littlewings"

fork in run := true

resolvers += "jboss repository" at "http://repository.jboss.org/nexus/content/groups/public-jboss/"

libraryDependencies += "org.infinispan" % "infinispan-core" % "5.2.0.Final"

続いて、ソースコード
src/main/scala/EmbeddedClusteredClient.scala

import org.infinispan.manager.DefaultCacheManager

object EmbeddedClusteredClient {
  def main(args: Array[String]): Unit = {
    val (selfName, pairNames) = args.toList match {
      case s :: restNames => (s, restNames)
      case _ => sys.exit(1)
    }
    new EmbeddedClusteredClient(selfName, pairNames).run
  }
}

class EmbeddedClusteredClient(val selfName: String, val pairNames: List[String]) {
  def run(): Unit = {
    val manager = new DefaultCacheManager("infinispan.xml")
    val cache = manager.getCache[String, String]()
    cache.addListener(new LoggingListener)

    try {
      val range = 1 to 5
      val keys = range.map(k => key(k.toString))
      val values = range.map(v => s"value$v")
      val pairNamesKeys = pairNames.flatMap(pn => range.map(k => key(pn, k.toString)))

      (keys zip values) foreach { case (k, v) => cache.put(k, v) }

      println("Sleeping...")
      Thread.sleep(10000L)

      keys foreach (k => log(s"$k => ${cache.get(k)}"))
      pairNamesKeys foreach (k => log(s"$k => ${cache.get(k)}"))
    } finally {
      cache.stop()
      manager.stop()
    }
  }

  def key(seed: String): String = key(selfName, seed)
  def key(name: String, seed: String): String = s"${name}:${seed}"

  def log(message: String): Unit =
    println(s"[$selfName] received, $message")
}

引数に、仮のノード名を取るようにしています。ひとつ目は自分自身で、それ以外は連携する相手のノード名を指定します。

今までの例と違って、

      cache.stop()
      manager.stop()

みたいなコードが入っています。クラスタリングを使う場合は、これを入れて明示的に停止しないと、JavaVMが終了しません。クラスタリングに使っている、非Daemonスレッドが残り続けてしまうためです。

キャッシュに対するイベントをコンソールに出力するため、Listenerを付けています。
src/main/scala/LoggingListener.scala

import org.infinispan.notifications.Listener
import org.infinispan.notifications.cachelistener.annotation.{CacheEntryCreated, CacheEntryRemoved}
import org.infinispan.notifications.cachelistener.event.{CacheEntryCreatedEvent, CacheEntryRemovedEvent}

@Listener
class LoggingListener {
  @CacheEntryCreated
  def observeAdd(event: CacheEntryCreatedEvent[_, _]): Unit =
    if (!event.isPre) println(s"Cache entry with key [${event.getKey}] added in cache [${event.getCache}]")
    else ()

  @CacheEntryRemoved
  def observeRemoved(event: CacheEntryRemovedEvent[_, _]): Unit =
    println(s"Cache entry with key [${event.getKey}] removed in cache [${event.getCache}]")
}

設定ファイルは、2つ用意しました。
まずは本体の方を。
src/main/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:5.2 http://www.infinispan.org/schemas/infinispan-config-5.2.xsd"
    xmlns="urn:infinispan:config:5.2">
   
  <global>
    <transport clusterName="demoCluster">
      <properties>
        <property name="configurationFile" value="jgroups.xml" />
      </properties>
    </transport>
    <globalJmxStatistics enabled="true"/>
  </global>

  <default>
    <jmxStatistics enabled="true"/>
    <clustering mode="distribution">
    <!-- <clustering mode="replication"> -->
      <hash numOwners="2" />
      <sync />
    </clustering>
  </default>
</infinispan>

クラスタの設定は、この部分で行っています。

    <clustering mode="distribution">
    <!-- <clustering mode="replication"> -->
      <hash numOwners="2" />
      <sync />
    </clustering>

設定は、たぶんこんな感じになっています。

  • clusteringタグのmode属性 → 分散キャッシュかレプリケーションかを指定(今回は分散キャッシュ)
  • hashタグのnumOwners属性 → データのコピーをいくつのノードに持つか(今回は2つ)
  • syncタグ → キャッシュの同期転送

syncタグの意味は、これだと思います。
https://access.redhat.com/knowledge/docs/ja-JP/JBoss_Data_Grid/6/html-single/Administration_and_Configuration_Guide/index.html#Asynchronous_and_Synchronous_Operations
numOwnersの意味は、たぶんこちら。
7.4.4.5. 分散されたデータグリッドの設定
https://access.redhat.com/knowledge/docs/ja-JP/JBoss_Data_Grid/6/html-single/Getting_Started_Guide/index.html#sect-Configure_the_Cluster

続いて、src/main/resources/jgroups.xml

<?xml version="1.0" encoding="UTF-8"?>
<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.2.xsd">
   <UDP
         mcast_addr="${jgroups.udp.mcast_addr:228.6.7.8}"
         mcast_port="${jgroups.udp.mcast_port:46655}"
         tos="8"
         ucast_recv_buf_size="100000"
         ucast_send_buf_size="100000"
         mcast_recv_buf_size="100000"
         mcast_send_buf_size="100000"
         loopback="true"
         max_bundle_size="64000"
         max_bundle_timeout="30"
         ip_ttl="${jgroups.udp.ip_ttl:2}"
         enable_bundling="true"
         enable_unicast_bundling="true"
         enable_diagnostics="false"

         thread_naming_pattern="cl"

         thread_pool.enabled="true"
         thread_pool.min_threads="2"
         thread_pool.max_threads="4"
         thread_pool.keep_alive_time="5000"
         thread_pool.queue_enabled="true"
         thread_pool.queue_max_size="100"
         thread_pool.rejection_policy="discard"

         oob_thread_pool.enabled="true"
         oob_thread_pool.min_threads="2"
         oob_thread_pool.max_threads="4"
         oob_thread_pool.keep_alive_time="5000"
         oob_thread_pool.queue_enabled="false"
         oob_thread_pool.queue_max_size="100"
         oob_thread_pool.rejection_policy="discard"
         />

   <PING timeout="3000" num_initial_members="2"/>
   <MERGE2 max_interval="30000" min_interval="10000"/>
   <FD_SOCK/>
   <FD_ALL/>
   <VERIFY_SUSPECT timeout="1500"/>
   <BARRIER />
   <pbcast.NAKACK use_mcast_xmit="true"
                  retransmit_timeout="300,600,1200"
                  discard_delivered_msgs="true"/>
   <UNICAST2 />
   <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000" max_bytes="1000000"/>
   <pbcast.GMS print_local_addr="true" join_timeout="3000" max_bundling_time="200" view_bundling="true"/>
   <UFC max_credits="500000" min_threshold="0.20"/>
   <MFC max_credits="500000" min_threshold="0.20"/>
   <FRAG2 frag_size="60000" />
</config>

設定の意味は、まだほとんどわかっていません…。このファイルがなくてもデフォルトの設定で起動するのですが、使っているOSのバッファサイズとかがJGroupsのデフォルト値にまったく合わなくて警告されまくるので、Infinispanに同梱されているサンプルと
https://github.com/infinispan/infinispan-quickstart/blob/master/clustered-cache/src/main/resources/jgroups.xml
を使って適当に設定しました…。

近く、ちゃんと調べます…。

では、動かしてみます。sbtコンソールを2つ起動して、それぞれ以下の様に実行します。

# コンソール1(node1)
> run node1 node2

# コンソール2(node2)
> run node2 node1

これで、コンソール1と2をパパッと実行します。

コンソール1(node1)

> run node1 node2
[info] Running EmbeddedClusteredClient node1 node2
[error] 2 09, 2013 8:53:32 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport start
[error] INFO: ISPN000078: Starting JGroups Channel
[info] 
[info] -------------------------------------------------------------------
[info] GMS: address=ubuntu-23062, cluster=demoCluster, physical address=fe80:0:0:0:20c:29ff:fe5c:cfec%2:56051
[info] -------------------------------------------------------------------
[error] 2 09, 2013 8:53:37 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport viewAccepted
[error] INFO: ISPN000094: Received new cluster view: [ubuntu-23062|0] [ubuntu-23062]
[error] 2 09, 2013 8:53:37 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport startJGroupsChannelIfNeeded
[error] INFO: ISPN000079: Cache local address is ubuntu-23062, physical addresses are [fe80:0:0:0:20c:29ff:fe5c:cfec%2:56051]
[error] 2 09, 2013 8:53:37 午後 org.infinispan.factories.GlobalComponentRegistry start
[error] INFO: ISPN000128: Infinispan version: Infinispan 'Delirium' 5.2.0.Final
[error] 2 09, 2013 8:53:37 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport viewAccepted
[error] INFO: ISPN000094: Received new cluster view: [ubuntu-23062|1] [ubuntu-23062, ubuntu-14783]
[error] 2 09, 2013 8:53:37 午後 org.infinispan.jmx.CacheJmxRegistration start
[error] INFO: ISPN000031: MBeans were successfully registered to the platform MBean server.
[info] Cache entry with key [node1:1] added in cache [Cache '___defaultcache'@ubuntu-23062]
[info] Cache entry with key [node1:2] added in cache [Cache '___defaultcache'@ubuntu-23062]
[info] Cache entry with key [node1:3] added in cache [Cache '___defaultcache'@ubuntu-23062]
[info] Cache entry with key [node1:4] added in cache [Cache '___defaultcache'@ubuntu-23062]
[info] Cache entry with key [node1:5] added in cache [Cache '___defaultcache'@ubuntu-23062]
[info] Sleeping...
[info] Cache entry with key [node2:1] added in cache [Cache '___defaultcache'@ubuntu-23062]
[info] Cache entry with key [node2:2] added in cache [Cache '___defaultcache'@ubuntu-23062]
[info] Cache entry with key [node2:3] added in cache [Cache '___defaultcache'@ubuntu-23062]
[info] Cache entry with key [node2:4] added in cache [Cache '___defaultcache'@ubuntu-23062]
[info] Cache entry with key [node2:5] added in cache [Cache '___defaultcache'@ubuntu-23062]
[info] [node1] received, node1:1 => value1
[info] [node1] received, node1:2 => value2
[info] [node1] received, node1:3 => value3
[info] [node1] received, node1:4 => value4
[info] [node1] received, node1:5 => value5
[info] [node1] received, node2:1 => value1
[info] [node1] received, node2:2 => value2
[info] [node1] received, node2:3 => value3
[info] [node1] received, node2:4 => value4
[info] [node1] received, node2:5 => value5
[error] 2 09, 2013 8:53:48 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport stop
[error] INFO: ISPN000080: Disconnecting and closing JGroups Channel
[error] 2 09, 2013 8:53:48 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport stop
[error] INFO: ISPN000082: Stopping the RpcDispatcher
[success] Total time: 19 s, completed 2013/02/09 20:53:48

コンソール2(node2)

> run node2 node1
[info] Running EmbeddedClusteredClient node2 node1
[error] 2 09, 2013 8:53:33 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport start
[error] INFO: ISPN000078: Starting JGroups Channel
[info] 
[info] -------------------------------------------------------------------
[info] GMS: address=ubuntu-14783, cluster=demoCluster, physical address=fe80:0:0:0:20c:29ff:fe5c:cfec%2:59560
[info] -------------------------------------------------------------------
[error] 2 09, 2013 8:53:37 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport viewAccepted
[error] INFO: ISPN000094: Received new cluster view: [ubuntu-23062|1] [ubuntu-23062, ubuntu-14783]
[error] 2 09, 2013 8:53:38 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport startJGroupsChannelIfNeeded
[error] INFO: ISPN000079: Cache local address is ubuntu-14783, physical addresses are [fe80:0:0:0:20c:29ff:fe5c:cfec%2:59560]
[error] 2 09, 2013 8:53:38 午後 org.infinispan.factories.GlobalComponentRegistry start
[error] INFO: ISPN000128: Infinispan version: Infinispan 'Delirium' 5.2.0.Final
[error] 2 09, 2013 8:53:38 午後 org.infinispan.jmx.CacheJmxRegistration start
[error] INFO: ISPN000031: MBeans were successfully registered to the platform MBean server.
[info] Cache entry with key [node2:1] added in cache [Cache '___defaultcache'@ubuntu-14783]
[info] Cache entry with key [node2:2] added in cache [Cache '___defaultcache'@ubuntu-14783]
[info] Cache entry with key [node2:3] added in cache [Cache '___defaultcache'@ubuntu-14783]
[info] Cache entry with key [node2:4] added in cache [Cache '___defaultcache'@ubuntu-14783]
[info] Cache entry with key [node2:5] added in cache [Cache '___defaultcache'@ubuntu-14783]
[info] Sleeping...
[error] 2 09, 2013 8:53:48 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport viewAccepted
[error] INFO: ISPN000094: Received new cluster view: [ubuntu-14783|2] [ubuntu-14783]
[info] [node2] received, node2:1 => value1
[info] [node2] received, node2:2 => value2
[info] [node2] received, node2:3 => value3
[info] [node2] received, node2:4 => value4
[info] [node2] received, node2:5 => value5
[info] [node2] received, node1:1 => value1
[info] [node2] received, node1:2 => value2
[info] [node2] received, node1:3 => value3
[info] [node2] received, node1:4 => value4
[info] [node2] received, node1:5 => value5
[error] 2 09, 2013 8:53:48 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport stop
[error] INFO: ISPN000080: Disconnecting and closing JGroups Channel
[error] 2 09, 2013 8:53:48 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport stop
[error] INFO: ISPN000082: Stopping the RpcDispatcher
[success] Total time: 19 s, completed 2013/02/09 20:53:49

一応、両方のnodeで同じデータが見れています。

# node1
[info] [node1] received, node1:1 => value1
[info] [node1] received, node1:2 => value2
[info] [node1] received, node1:3 => value3
[info] [node1] received, node1:4 => value4
[info] [node1] received, node1:5 => value5
[info] [node1] received, node2:1 => value1
[info] [node1] received, node2:2 => value2
[info] [node1] received, node2:3 => value3
[info] [node1] received, node2:4 => value4
[info] [node1] received, node2:5 => value5

# node2
[info] [node2] received, node2:1 => value1
[info] [node2] received, node2:2 => value2
[info] [node2] received, node2:3 => value3
[info] [node2] received, node2:4 => value4
[info] [node2] received, node2:5 => value5
[info] [node2] received, node1:1 => value1
[info] [node2] received, node1:2 => value2
[info] [node2] received, node1:3 => value3
[info] [node2] received, node1:4 => value4
[info] [node2] received, node1:5 => value5

途中で出ている

[info] Cache entry with key [node1:1] added in cache [Cache '___defaultcache'@ubuntu-23062]

みたいなログは、仕込んだListenerが吐いているものです。node1の方では、node2で追加したエントリがログ出力されています。そういうのは出ていませんが…。

結果として、両nodeで同じデータが見えているので、一応共有できているということかな?まあ、numOwnersが2なので、両方とも同じデータを持っているんでしょうけど。

レプリケーションモードにしても、見た目の動きは変わらないので割愛。

ちなみに、設定はこのままで

> run node1 node2 node3
> run node2 node1 node3
> run node3 node1 node2

みたいに3ノード以上を使おうとするとどこかのnodeで

[error] WARN: ISPN000071: Caught exception when handling command CacheTopologyControlCommand{cache=___defaultcache, type=REBALANCE_CONFIRM, sender=ubuntu-4378, joinInfo=null, topologyId=6, currentCH=null, pendingCH=null, throwable=null, viewId=2}
[error] org.infinispan.CacheException: Received invalid rebalance confirmation from ubuntu-4378 for cache ___defaultcache, we don't have a rebalance in progress

...

[error] ERROR: Failed to start rebalance: null
[error] java.lang.InterruptedException

とか、

[error] ERROR: Failed to start rebalance: org.infinispan.CacheException: Remote (ubuntu-18047) failed unexpectedly
[error] java.util.concurrent.ExecutionException: org.infinispan.CacheException: Remote (ubuntu-18047) failed unexpectedly

みたいなエラーを見ます。

クラスタリングのモードをレプリケーションモードにしたり、numOwnersを3にすると出なくなるので、先に終了しちゃったnodeが持ってたデータに対する振り先がなくなるので、再バランシングできなくなったよってことかな?