ホントは、先週書こうとしていたエントリなのですが、InfinispanのDistributed Execution Frameworkを触ってみました。
Distributed Execution Framework
https://docs.jboss.org/author/display/ISPN/Infinispan+Distributed+Execution+Framework
ドキュメントを見た時は、最初意味がわからなかったのですが…。
Distributed Execution Frameworkというのは、Infinispan上で分散処理を実行するためのフレームワークです。って、名前のままですね。
ばくっというと、以下のような感じになっています。
- java.util.concurrent.Callable、ExecutorService、Futureを拡張して利用
- クラスタ環境で動作している各Infinispanのインスタンスに対して、Callableのインスタンスを転送することができる
- 結果は、ExecutorServiceに処理を依頼した側に、Futureとして返される
- Callableは、Infinispanが拡張したインターフェースを使用することで、転送先のInfinispanに合わせたKeyのセット、そしてCacheを使用することができる
たとえば、
Infinispan[Embedded] | Infinispan[Embedded] | Infinispan[Embedded]
みたいにInfinispanが並んでいたとして、どれかひとつに対してExecutorServiceに対して処理を依頼すると
Infinispan[Embedded] <= ExecutorService#submitEverywhere(Callable) | Infinispan[Embedded] | Infinispan[Embedded]
各インスタンスにCallableが転送されて、そこで処理が実行されます。
Infinispan[Embedded] Callable#call | Infinispan[Embedded] Callable#call | Infinispan[Embedded] Callable#call
で、結果は元のインスタンスに対して、FutureのListとして戻ります。
Infinispan[Embedded] => List<Future> | Infinispan[Embedded] | Infinispan[Embedded]
そんな仕組み。
*単一のインスタンスで実行してFutureが返る、submitも使えます
とりあえず、こんな環境準備をしました。
build.sbt
name := "infinispan-distributed-execution-framework-example" version := "0.0.1" scalaVersion := "2.10.1" organization := "littlewings" fork in run := true scalacOptions += "-deprecation" resolvers += "JBoss Public Maven Repository Group" at "http://repository.jboss.org/nexus/content/groups/public-jboss/" libraryDependencies += "org.infinispan" % "infinispan-core" % "5.2.1.Final"
Scalaをしれっと2.10.1に。Infinispanは、Mavenリポジトリを見る限り3/12に5.2.5.Finalがデプロイされているようなのですが、オフィシャルサイトにはアナウンスが出てないのでいったん5.2.1.Finalのままで。
最終的に、クラスタキャッシュも使用するので、設定ファイルも用意しておきました。
src/main/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:5.2 http://www.infinispan.org/schemas/infinispan-config-5.2.xsd" xmlns="urn:infinispan:config:5.2"> <global> <transport clusterName="distributionFrameworkCluster"> <properties> <property name="configurationFile" value="jgroups.xml" /> </properties> </transport> <globalJmxStatistics enabled="true"/> </global> <default> <jmxStatistics enabled="true"/> <clustering mode="distribution"> <hash numOwners="1" /> <sync /> </clustering> </default> </infinispan>
分散キャッシュを使用するので、クラスタのモードは「distribution」に、それからnumOwnersは1に絞り込んでいます。
src/main/resources/jgroups.xml
<?xml version="1.0" encoding="UTF-8"?> <config xmlns="urn:org:jgroups" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.2.xsd"> <UDP mcast_addr="${jgroups.udp.mcast_addr:228.11.11.11}" mcast_port="${jgroups.udp.mcast_port:45688}" tos="8" ucast_recv_buf_size="130000" ucast_send_buf_size="100000" mcast_recv_buf_size="130000" mcast_send_buf_size="100000" loopback="true" max_bundle_size="64000" max_bundle_timeout="30" ip_ttl="${jgroups.udp.ip_ttl:2}" enable_bundling="true" enable_unicast_bundling="true" enable_diagnostics="true" diagnostics_addr="${jboss.jgroups.diagnostics_addr:224.0.0.75}" diagnostics_port="${jboss.jgroups.diagnostics_port:7500}" thread_naming_pattern="hr-cl" thread_pool.enabled="true" thread_pool.min_threads="2" thread_pool.max_threads="8" thread_pool.keep_alive_time="5000" thread_pool.queue_enabled="true" thread_pool.queue_max_size="1000" thread_pool.rejection_policy="discard" oob_thread_pool.enabled="true" oob_thread_pool.min_threads="2" oob_thread_pool.max_threads="8" oob_thread_pool.keep_alive_time="1000" oob_thread_pool.queue_enabled="false" oob_thread_pool.rejection_policy="discard" /> <PING timeout="3000" num_initial_members="2" /> <!-- <FD timeout="3000" max_tries="5" /> --> <FD_ALL interval="3000" timeout="10000" /> <FD_SOCK /> <VERIFY_SUSPECT timeout="1500" /> <UNICAST2 /> <pbcast.NAKACK2 use_mcast_xmit="false" xmit_interval="1000" discard_delivered_msgs="true" /> <pbcast.GMS print_local_addr="true" join_timeout="3000" leave_timeout="3000" merge_timeout="3000" max_bundling_time="200" view_bundling="true" /> <UFC max_credits="500000" min_threshold="0.20" /> <MFC max_credits="500000" min_threshold="0.20" /> <MERGE3 max_interval="30000" min_interval="10000" /> <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000" max_bytes="400000" /> <FRAG2 frag_size="60000" /> </config>
DefaultExecutorServiceとCallableを使用する
DefaultExecutorService#submit
1番基本的なパターンです。まずは、Callableの実装として以下のようなものを用意します。
src/main/scala/MyCallable.scala
import java.util.concurrent.Callable class MyCallable extends Callable[Integer] { println("create mycallable instance") def call(): Integer = { println("called") 10 } }
すごく単純な、Callableの実装ですね…。
利用する側として、こんなのを用意しました。
src/main/scala/DistributedExecutionFrameworkExample.scala
import scala.collection.JavaConverters._ import java.util.Set import org.infinispan.Cache import org.infinispan.manager.DefaultCacheManager import org.infinispan.distexec.DefaultExecutorService object DistributedExecutionFrameworkExample { def main(args: Array[String]): Unit = { val manager = new DefaultCacheManager("infinispan.xml") val cache = manager.getCache[String, Integer]() val des = new DefaultExecutorService(cache) try { val future = des.submit(new MyCallable) println(s"ExecutorService#submit Result => ${future.get}") } finally { des.shutdown() cache.stop() manager.stop() } } }
ExecutorServiceは、Infinispanが提供しているDefaultExecutorServiceクラスを使用します。インスタンス生成の際には、InfinispanのCacheが必要です。
val des = new DefaultExecutorService(cache)
あとは、submitすればFutureが返ってきます。
val future = des.submit(new MyCallable) println(s"ExecutorService#submit Result => ${future.get}")
なお、DefaultExecutorServiceクラスのコンストラクタの引数は、BasicCacheではなくCacheなので、Hot Rodでは使用できないと思われます。
では、実行してみます。
> run-main DistributedExecutionFrameworkExample [info] Running DistributedExecutionFrameworkExample 〜省略〜 [info] create mycallable instance [info] called [info] ExecutorService#submit Result => 10 〜省略〜 [success] Total time: 6 s, completed 2013/03/16 16:38:15
まあ、これだけだと普通のjava.util.concurrent.ExecutorServiceを使った場合となんら変わりがありません。
DefaultExecutorService#submitEverywhere
ここで、もうひとつInfinispanのインスタンスを追加します。
src/main/scala/EmbeddedCacheServer.scala
import org.infinispan.manager.DefaultCacheManager object EmbeddedCacheServer { def main(args: Array[String]): Unit = { val manager = new DefaultCacheManager("infinispan.xml") val cache = manager.getCache[String, String]() cache.addListener(new LoggingListener) } }
Cache#stopとDefaultCacheManager#stopを呼ばないとJavaVMが終了しないので、それを利用してちょっと浮いててもらいます。あと、LoggingListenerというクラスはまた後で説明します。
以降、最初に使っていたDistributedExecutionFrameworkExample内で動作しているInfinispanをインスタンス-A、EmbeddedCacheServerで浮かせているInfinspanをインスタンス-Bと表記します。
では、インスタンス-Bを起動。
$ sbt "run-main EmbeddedCacheServer" [info] Set current project to infinispan-distributed-execution-framework-example (in build file:/xxxxx/infinispan-distributed-execution-framework/) [info] Running EmbeddedCacheServer [error] 3 16, 2013 4:47:02 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport start [error] INFO: ISPN000078: Starting JGroups Channel [info] [info] ------------------------------------------------------------------- [info] GMS: address=ubuntu-63354, cluster=distributionFrameworkCluster, physical address=fe80:0:0:0:20c:29ff:fe5c:cfec%2:46321 [info] ------------------------------------------------------------------- [error] 3 16, 2013 4:47:06 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport viewAccepted [error] INFO: ISPN000094: Received new cluster view: [ubuntu-63354|0] [ubuntu-63354] [error] 3 16, 2013 4:47:06 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport startJGroupsChannelIfNeeded [error] INFO: ISPN000079: Cache local address is ubuntu-63354, physical addresses are [fe80:0:0:0:20c:29ff:fe5c:cfec%2:46321] [error] 3 16, 2013 4:47:06 午後 org.infinispan.factories.GlobalComponentRegistry start [error] INFO: ISPN000128: Infinispan version: Infinispan 'Delirium' 5.2.1.Final [error] 3 16, 2013 4:47:06 午後 org.infinispan.jmx.CacheJmxRegistration start [error] INFO: ISPN000031: MBeans were successfully registered to the platform MBean server.
では、先ほどのサンプル(DistributedExecutionFrameworkExample)で
val future = des.submit(new MyCallable) println(s"ExecutorService#submit Result => ${future.get}")
と書かれていた箇所を
val futures = des.submitEverywhere(new MyCallable) futures.asScala.foreach(f => println(f.get))
と変更して実行します。
> run-main DistributedExecutionFrameworkExample 〜省略〜 [error] 3 16, 2013 4:49:24 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport stop [error] INFO: ISPN000080: Disconnecting and closing JGroups Channel [error] 3 16, 2013 4:49:25 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport stop [error] INFO: ISPN000082: Stopping the RpcDispatcher [error] org.infinispan.CacheException: org.infinispan.marshall.NotSerializableException: MyCallable [error] at org.infinispan.util.Util.cloneWithMarshaller(Util.java:259) [error] at org.infinispan.distexec.DefaultExecutorService.clone(DefaultExecutorService.java:555) [error] at org.infinispan.distexec.DefaultExecutorService.submitEverywhere(DefaultExecutorService.java:510) [error] at org.infinispan.distexec.DefaultExecutorService.submitEverywhere(DefaultExecutorService.java:497) 〜省略〜
Callableがシリアライズできないよ?と怒られました。
というわけで、先ほどのMyCallableクラスにSerializableを実装してリトライ。
@SerialVersionUID(1L) class MyCallable extends Callable[Integer] with Serializable {
浮かしておいたインスタンス-Bも、再起動しておきます。では、実行。
> run-main DistributedExecutionFrameworkExample [info] Compiling 1 Scala source to /xxxxx/infinispan-distributed-execution-framework/target/scala-2.10/classes... [info] Compiling 2 Scala sources to /xxxxx/infinispan-distributed-execution-framework/target/scala-2.10/classes... [info] Running DistributedExecutionFrameworkExample [error] 3 16, 2013 4:52:20 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport start [error] INFO: ISPN000078: Starting JGroups Channel [info] [info] ------------------------------------------------------------------- [info] GMS: address=ubuntu-13073, cluster=distributionFrameworkCluster, physical address=fe80:0:0:0:20c:29ff:fe5c:cfec%2:50966 [info] ------------------------------------------------------------------- 〜省略〜 [info] create mycallable instance [info] 10 [info] called [info] 10 〜省略〜 [error] INFO: ISPN000082: Stopping the RpcDispatcher [success] Total time: 6 s, completed 2013/03/16 16:52:22
今度は、結果が返ってきました。
インスタンス-B側でも、Callableが実行されたことがわかります。
[error] 3 16, 2013 4:52:21 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport viewAccepted [error] INFO: ISPN000094: Received new cluster view: [ubuntu-16854|1] [ubuntu-16854, ubuntu-13073] [info] called <---- コレ [error] 3 16, 2013 4:52:22 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport viewAccepted [error] INFO: ISPN000094: Received new cluster view: [ubuntu-16854|2] [ubuntu-16854]
ただ、このままだと単なるCallableを各インスタンスに転送しただけで、InfinispanのCacheを使用した処理は難しいです。できない、ってわけではないでしょうけど。
ここで利用するのが、DistributedCallableインターフェースのようです。
DefaultExecutorServiceとDistributedCallableを使用する
DefaultExecutorService#submit
DistributedCallableインターフェースは、Callableインターフェースを拡張したものです。
public interface DistributedCallable<K, V, T> extends Callable<T> { public void setEnvironment(Cache<K, V> cache, Set<K> inputKeys); }
このインターフェースを利用すると、このCallableで使用するCacheとタスクの実行時にもらったキーのSetを受け取ることができるようになります。
では、DistributedCallableインターフェースを実装したクラスを作成してみます。
src/main/scala/MyDistributedCallable.scala
import scala.collection.JavaConverters._ import org.infinispan.Cache import org.infinispan.distexec.DistributedCallable class MyDistributedCallable extends DistributedCallable[String, Integer, Integer] { var inputKeys: Set[String] = _ var cache: Cache[String, Integer] = _ println("create distributedcallable instance") def setEnvironment(cache: Cache[String, Integer], inputKeys: java.util.Set[String]): Unit = { println(s"Input Keys => Size:${inputKeys.size}, Values:${inputKeys}") this.inputKeys = Set.empty ++ inputKeys.asScala this.cache = cache } def call(): Integer = { println("called") inputKeys.foldLeft(0) { (acc, key) => cache.get(key) match { case null => acc case v => acc + v } } } }
DistributedCallable#setEnvironmentが呼ばれたタイミングで、受け取ったKey一式を出力するようにしています。
で、先ほどのDistributedExecutionFrameworkExampleオブジェクトを変更します。
def main(args: Array[String]): Unit = { val manager = new DefaultCacheManager("infinispan.xml") val cache = manager.getCache[String, Integer]() val limit = 50 val keys = new Array[String](limit) var total = 0 for (i <- 1 to limit) { val key = "key" + i keys(i - 1) = key cache.put(key, i) total += i } val des = new DefaultExecutorService(cache) try { val future = des.submit(new MyDistributedCallable, keys: _*) println(s"ExecutorService#submit Result => ${future.get}, Local Sum = ${total}") } finally { des.shutdown() cache.stop() manager.stop() } }
1〜50までのKeyとValueをCacheに突っ込みつつ、その時のKey一式をDefaultExecutorService#submitの引数に、DistributedCallableのインスタンスと一緒に渡します。
結果はFutureとして戻ってくるので、その結果と最初に自前で合算した結果を出力してお終いです。
では、実行してみます。
> run-main DistributedExecutionFrameworkExample 〜省略〜 [info] Input Keys => Size:50, Values:[key38, key39, key34, key35, key36, key37, key42, key41, key44, key43, key40, key4, key3, key6, key5, key2, key1, key49, key10, key11, key47, key8, key48, key7, key45, key46, key9, key50, key21, key22, key20, key15, key14, key13, key12, key19, key18, key17, key16, key30, key31, key32, key33, key24, key23, key26, key25, key28, key27, key29] [info] called [info] ExecutorService#submit Result => 1275, Local Sum = 1275 〜省略〜 [success] Total time: 8 s, completed 2013/03/16 17:13:00
JGroupsその他のログは、端折りました。渡したKeyがDistributedCallableに渡され、CacheからKeyを使ってValueを合算できましたね。
DefaultExecutorService#submitEverywhere
最後、DefaultExecutorService#submitEverywhereを使用します。なので、ここで再び先ほどのインスタンス-Bに登場してもらいます。
シリアライズが必要だと思われるので、MyDistributedCallableクラスはシリアライズ可能にしておきます。
@SerialVersionUID(1L) class MyDistributedCallable extends DistributedCallable[String, Integer, Integer] with Serializable {
また、今回は分散キャッシュで使用するので、Keyがどのインスタンスに設定されたのか分かるように、CacheにLoggerを付けておくことにします。
src/main/scala/LoggingListener.scala import org.infinispan.notifications.Listener import org.infinispan.notifications.cachelistener.annotation.{CacheEntryCreated, CacheEntryRemoved} import org.infinispan.notifications.cachelistener.event.{CacheEntryCreatedEvent, CacheEntryRemovedEvent} @Listener class LoggingListener { @CacheEntryCreated def observeAdd(event: CacheEntryCreatedEvent[_, _]): Unit = if (!event.isPre) println(s"Cache entry with key [${event.getKey}] added in cache [${event.getCache.getName}]") else () @CacheEntryRemoved def observeRemoved(event: CacheEntryRemovedEvent[_, _]): Unit = println(s"Cache entry with key [${event.getKey}] removed in cache [${event.getCache.getName}]") }
src/main/scala/DistributedExecutionFrameworkExample.scala
val cache = manager.getCache[String, Integer]() cache.addListener(new LoggingListener)
そして、submitEverywhereを使うように修正。
val futures = des.submitEverywhere(new MyDistributedCallable, keys: _*) val callableTotal = futures.asScala.foldLeft(0) { (acc, f) => acc + f.get } println(s"ExecutorService#submitEverywhere Result => ${callableTotal}, Local Sum = ${total}")
ここで、インスタンス-Bを起動しておきます。
$ sbt "run-main EmbeddedCacheServer"
インスタンス-Aから実行します。
> run-main DistributedExecutionFrameworkExample 〜省略〜 [info] Cache entry with key [key5] added in cache [___defaultcache] [info] Cache entry with key [key9] added in cache [___defaultcache] [info] Cache entry with key [key10] added in cache [___defaultcache] [info] Cache entry with key [key13] added in cache [___defaultcache] [info] Cache entry with key [key14] added in cache [___defaultcache] [info] Cache entry with key [key16] added in cache [___defaultcache] [info] Cache entry with key [key17] added in cache [___defaultcache] [info] Cache entry with key [key18] added in cache [___defaultcache] [info] Cache entry with key [key22] added in cache [___defaultcache] [info] Cache entry with key [key23] added in cache [___defaultcache] [info] Cache entry with key [key28] added in cache [___defaultcache] [info] Cache entry with key [key29] added in cache [___defaultcache] [info] Cache entry with key [key30] added in cache [___defaultcache] [info] Cache entry with key [key32] added in cache [___defaultcache] [info] Cache entry with key [key34] added in cache [___defaultcache] [info] Cache entry with key [key35] added in cache [___defaultcache] [info] Cache entry with key [key36] added in cache [___defaultcache] [info] Cache entry with key [key38] added in cache [___defaultcache] [info] Cache entry with key [key40] added in cache [___defaultcache] [info] Cache entry with key [key44] added in cache [___defaultcache] [info] Cache entry with key [key46] added in cache [___defaultcache] [info] Cache entry with key [key49] added in cache [___defaultcache] [info] Cache entry with key [key50] added in cache [___defaultcache] [info] create distributedcallable instance [info] Input Keys => Size:23, Values:[key5, key38, key49, key10, key34, key30, key35, key22, key36, key32, key46, key9, key23, key14, key44, key13, key28, key50, key18, key40, key17, key29, key16] [info] called [info] ExecutorService#submitEverywhere Result => 1275, Local Sum = 1275 〜省略〜 [success] Total time: 6 s, completed 2013/03/16 17:27:59
ログから、インスタンス-Aには、KeyとValueが23個登録されたようです。DistributedCallable#setEnvironmentに渡されたKeyの数も、やっぱり23個ですね。
計算結果は、それぞれのFutureの結果を合わせると同じですよ、と。
渡ってくるKeyは、java.util.HashSetになっているので、順番はバラバラです。
一方、インスタンス-B側のコンソールはこうなっています。
[error] 3 16, 2013 5:27:58 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport viewAccepted [error] INFO: ISPN000094: Received new cluster view: [ubuntu-37697|1] [ubuntu-37697, ubuntu-60826] [info] Cache entry with key [key1] added in cache [___defaultcache] [info] Cache entry with key [key2] added in cache [___defaultcache] [info] Cache entry with key [key3] added in cache [___defaultcache] [info] Cache entry with key [key4] added in cache [___defaultcache] [info] Cache entry with key [key6] added in cache [___defaultcache] [info] Cache entry with key [key7] added in cache [___defaultcache] [info] Cache entry with key [key8] added in cache [___defaultcache] [info] Cache entry with key [key11] added in cache [___defaultcache] [info] Cache entry with key [key12] added in cache [___defaultcache] [info] Cache entry with key [key15] added in cache [___defaultcache] [info] Cache entry with key [key19] added in cache [___defaultcache] [info] Cache entry with key [key20] added in cache [___defaultcache] [info] Cache entry with key [key21] added in cache [___defaultcache] [info] Cache entry with key [key24] added in cache [___defaultcache] [info] Cache entry with key [key25] added in cache [___defaultcache] [info] Cache entry with key [key26] added in cache [___defaultcache] [info] Cache entry with key [key27] added in cache [___defaultcache] [info] Cache entry with key [key31] added in cache [___defaultcache] [info] Cache entry with key [key33] added in cache [___defaultcache] [info] Cache entry with key [key37] added in cache [___defaultcache] [info] Cache entry with key [key39] added in cache [___defaultcache] [info] Cache entry with key [key41] added in cache [___defaultcache] [info] Cache entry with key [key42] added in cache [___defaultcache] [info] Cache entry with key [key43] added in cache [___defaultcache] [info] Cache entry with key [key45] added in cache [___defaultcache] [info] Cache entry with key [key47] added in cache [___defaultcache] [info] Cache entry with key [key48] added in cache [___defaultcache] [info] Input Keys => Size:27, Values:[key39, key21, key20, key37, key15, key42, key41, key12, key43, key19, key4, key3, key6, key2, key1, key11, key47, key8, key48, key31, key7, key45, key33, key24, key26, key25, key27] [info] called [error] 3 16, 2013 5:27:59 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport viewAccepted [error] INFO: ISPN000094: Received new cluster view: [ubuntu-37697|2] [ubuntu-37697]
こちらには、残りの27個のKeyが転送されていますね。
というわけで、DistributedCallableインターフェースを使用すると、それぞれのInfinispanのインスタンスで稼働するDistributedCallableのインスタンスが使用するCacheと、Keyの整合性が取れるように割り振ってくれるようです。
このKeyを割り振ってくれるのは、便利だなぁと思いますね。
Callableインターフェースだけを実装した場合でも、submit時にKeyを渡すことはできるのですが、Callable側で受け取る術がありません。また、Cacheの面倒もみてもらえないので、素直にDistributedCallableインターフェースを使用するのが吉でしょう。
ちなみに、このサンプルをsubmitEverywhereではなく
val futures = des.submitEverywhere(new MyDistributedCallable, keys: _*) val callableTotal = futures.asScala.foldLeft(0) { (acc, f) => acc + f.get } println(s"ExecutorService#submitEverywhere Result => ${callableTotal}, Local Sum = ${total}")
submitを使うように変更すると、
val future = des.submit(new MyDistributedCallable, keys: _*) println(s"ExecutorService#submit Result => ${future.get}, Local Sum = ${total}")
どれかひとつのインスタンスでDistributedCallableのインスタンスが実行されるようになります。
インスタンス-A
> run-main DistributedExecutionFrameworkExample 〜省略〜 [info] Cache entry with key [key5] added in cache [___defaultcache] [info] Cache entry with key [key9] added in cache [___defaultcache] [info] Cache entry with key [key10] added in cache [___defaultcache] [info] Cache entry with key [key13] added in cache [___defaultcache] [info] Cache entry with key [key14] added in cache [___defaultcache] [info] Cache entry with key [key16] added in cache [___defaultcache] [info] Cache entry with key [key17] added in cache [___defaultcache] [info] Cache entry with key [key18] added in cache [___defaultcache] [info] Cache entry with key [key22] added in cache [___defaultcache] [info] Cache entry with key [key23] added in cache [___defaultcache] [info] Cache entry with key [key28] added in cache [___defaultcache] [info] Cache entry with key [key29] added in cache [___defaultcache] [info] Cache entry with key [key30] added in cache [___defaultcache] [info] Cache entry with key [key32] added in cache [___defaultcache] [info] Cache entry with key [key34] added in cache [___defaultcache] [info] Cache entry with key [key35] added in cache [___defaultcache] [info] Cache entry with key [key36] added in cache [___defaultcache] [info] Cache entry with key [key38] added in cache [___defaultcache] [info] Cache entry with key [key40] added in cache [___defaultcache] [info] Cache entry with key [key44] added in cache [___defaultcache] [info] Cache entry with key [key46] added in cache [___defaultcache] [info] Cache entry with key [key49] added in cache [___defaultcache] [info] Cache entry with key [key50] added in cache [___defaultcache] [info] create distributedcallable instance [info] Input Keys => Size:50, Values:[key38, key39, key34, key35, key36, key37, key42, key41, key44, key43, key40, key4, key3, key6, key5, key2, key1, key49, key10, key11, key47, key8, key48, key7, key45, key46, key9, key50, key21, key22, key20, key15, key14, key13, key12, key19, key18, key17, key16, key30, key31, key32, key33, key24, key23, key26, key25, key28, key27, key29] [info] called [info] ExecutorService#submit Result => 1275, Local Sum = 1275 〜省略〜 [success] Total time: 4 s, completed 2013/03/16 17:42:35
インスタンス-B
[error] 3 16, 2013 5:42:33 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport viewAccepted [error] INFO: ISPN000094: Received new cluster view: [ubuntu-13349|7] [ubuntu-13349, ubuntu-44271] [error] 3 16, 2013 5:42:35 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport viewAccepted [error] INFO: ISPN000094: Received new cluster view: [ubuntu-13349|8] [ubuntu-13349]
インスタンス-Bには、JGroupsでのメンバー情報が変わったくらいのログしかなく、インスタンス-AにすべてのKeyが渡っています。ただし、結果はsubmitEverywhereを使った時と一緒。
今回はインスタンス-Aの上でDistributedCallableのインスタンスが実行されましたが、インスタンス-Bの上で実行されることもありました。実行されたインスタンス上に、CacheのKeyに対応するエントリがない場合は、結局ネットワーク通信が発生しちゃうんでしょうね。
その他、タスクが失敗した時のフェイルオーバーの設定
https://docs.jboss.org/author/display/ISPN/Infinispan+Distributed+Execution+Framework#InfinispanDistributedExecutionFramework-Distributedtaskfailover
実行ポリシーの設定
https://docs.jboss.org/author/display/ISPN/Infinispan+Distributed+Execution+Framework#InfinispanDistributedExecutionFramework-Distributedtaskexecutionpolicy
上記と合わせてタイムアウトなどの設定
https://docs.jboss.org/author/display/ISPN/Infinispan+Distributed+Execution+Framework#InfinispanDistributedExecutionFramework-DistributedExecutorService%2CDistributedTaskBuilderandDistributedTaskAPI
などもできるようですが、今回は割愛。
CDIもドキュメントに載っていましたが、実行してみた感じだとJBoss ASとか要りそうだったのでやっぱりお見送り。
でも、なかなか面白いフレームワークでした。
最後に、全コードをもう1度載せておきます。
src/main/scala/DistributedExecutionFrameworkExample.scala import scala.collection.JavaConverters._ import java.util.Set import org.infinispan.Cache import org.infinispan.manager.DefaultCacheManager import org.infinispan.distexec.DefaultExecutorService object DistributedExecutionFrameworkExample { def main(args: Array[String]): Unit = { val manager = new DefaultCacheManager("infinispan.xml") val cache = manager.getCache[String, Integer]() cache.addListener(new LoggingListener) val limit = 50 val keys = new Array[String](limit) var total = 0 for (i <- 1 to limit) { val key = "key" + i keys(i - 1) = key cache.put(key, i) total += i } val des = new DefaultExecutorService(cache) try { /* val future = des.submit(new MyCallable) println(s"ExecutorService#submit Result => ${future.get}") */ /* val futures = des.submitEverywhere(new MyCallable) futures.asScala.foreach(f => println(f.get)) */ /* val future = des.submit(new MyDistributedCallable, keys: _*) println(s"ExecutorService#submit Result => ${future.get}, Local Sum = ${total}") */ val futures = des.submitEverywhere(new MyDistributedCallable, keys: _*) val callableTotal = futures.asScala.foldLeft(0) { (acc, f) => acc + f.get } println(s"ExecutorService#submitEverywhere Result => ${callableTotal}, Local Sum = ${total}") } finally { des.shutdown() cache.stop() manager.stop() } } }
src/main/scala/MyDistributedCallable.scala
import scala.collection.JavaConverters._ import org.infinispan.Cache import org.infinispan.distexec.DistributedCallable @SerialVersionUID(1L) class MyDistributedCallable extends DistributedCallable[String, Integer, Integer] with Serializable { var inputKeys: Set[String] = _ var cache: Cache[String, Integer] = _ println("create distributedcallable instance") def setEnvironment(cache: Cache[String, Integer], inputKeys: java.util.Set[String]): Unit = { println(s"Input Keys => Size:${inputKeys.size}, Values:${inputKeys}") this.inputKeys = Set.empty ++ inputKeys.asScala this.cache = cache } def call(): Integer = { println("called") inputKeys.foldLeft(0) { (acc, key) => cache.get(key) match { case null => acc case v => acc + v } } } }
src/main/scala/MyCallable.scala
import java.util.concurrent.Callable @SerialVersionUID(1L) class MyCallable extends Callable[Integer] with Serializable { println("create mycallable instance") def call(): Integer = { println("called") 10 } }
src/main/scala/LoggingListener.scala
import org.infinispan.notifications.Listener import org.infinispan.notifications.cachelistener.annotation.{CacheEntryCreated, CacheEntryRemoved} import org.infinispan.notifications.cachelistener.event.{CacheEntryCreatedEvent, CacheEntryRemovedEvent} @Listener class LoggingListener { @CacheEntryCreated def observeAdd(event: CacheEntryCreatedEvent[_, _]): Unit = if (!event.isPre) println(s"Cache entry with key [${event.getKey}] added in cache [${event.getCache.getName}]") else () @CacheEntryRemoved def observeRemoved(event: CacheEntryRemovedEvent[_, _]): Unit = println(s"Cache entry with key [${event.getKey}] removed in cache [${event.getCache.getName}]") }
src/main/scala/EmbeddedCacheServer.scala
import org.infinispan.manager.DefaultCacheManager object EmbeddedCacheServer { def main(args: Array[String]): Unit = { val manager = new DefaultCacheManager("infinispan.xml") val cache = manager.getCache[String, String]() cache.addListener(new LoggingListener) } }