先日、Infinispan 8.2.0.Finalがリリースされました。
Infinispan: Infinispan 8.2.0.Final is out!
追加された新機能のうち、今回はCluster Executorを試してみます。
Cluster Executorとは、クラスタ内の全Node、もしくは指定したNodeで処理を実行するための機能です。すでに似た機能としてDistributed Executor Frameworkがありますが、こちらと異なるのは実行するのにCacheが不要なこと(EmbeddedCacheManagerがあれば実行可能)なことです。
渡す処理は、シリアライズ可能である必要がありますが、これらをLambdaで簡単に書けるようにするために、org.infinispan.utilパッケージにSerializableなRunnableやFunctionが追加されています。
https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/util/SerializableRunnable.java
https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/util/SerializableCallable.java
https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/util/SerializableFunction.java
これらができたことと、利用する箇所の引数の型として定義されたことによってSerializableにするための交差型キャストがいらなくなったみたいです(Javaでは…)。
では、ちょっと使ってみましょう。
追記)
書いた後で思いましたが、Cacheの種類を含めて書いていますけど、あんまり意味なかったですね…。そもそも、「Cacheが要らない」って書いてるのに。
準備
ビルド定義。
build.sbt
name := "embedded-cluster-executor" version := "0.0.1-SNAPSHOT" organization := "org.littlewings" scalaVersion := "2.11.8" scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature", "-Xexperimental") updateOptions := updateOptions.value.withCachedResolution(true) parallelExecution in Test := false libraryDependencies ++= Seq( "org.infinispan" % "infinispan-core" % "8.2.0.Final", "net.jcip" % "jcip-annotations" % "1.0" % "provided", "org.scalatest" %% "scalatest" % "2.2.6" % "test" )
今回は、SAMのサポートが欲しかったので、コンパイルオプションに「-Xexperimental」を入れました。
JGroupsの設定は、以下のとおり。
src/test/resources/jgroups.xml
<?xml version="1.0" encoding="UTF-8"?> <config xmlns="urn:org:jgroups" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.6.xsd"> <UDP mcast_addr="${jgroups.udp.mcast_addr:228.6.7.8}" mcast_port="${jgroups.udp.mcast_port:46655}" ucast_send_buf_size="130k" mcast_send_buf_size="130k" ucast_recv_buf_size="150k" mcast_recv_buf_size="200k" ip_ttl="${jgroups.ip_ttl:2}" thread_naming_pattern="pl" enable_diagnostics="false" bundler_type="sender-sends-with-timer" thread_pool.min_threads="${jgroups.thread_pool.min_threads:2}" thread_pool.max_threads="${jgroups.thread_pool.max_threads:30}" thread_pool.keep_alive_time="60000" thread_pool.queue_enabled="false" internal_thread_pool.min_threads="${jgroups.internal_thread_pool.min_threads:5}" internal_thread_pool.max_threads="${jgroups.internal_thread_pool.max_threads:20}" internal_thread_pool.keep_alive_time="60000" internal_thread_pool.queue_enabled="true" internal_thread_pool.queue_max_size="500" oob_thread_pool.min_threads="${jgroups.oob_thread_pool.min_threads:20}" oob_thread_pool.max_threads="${jgroups.oob_thread_pool.max_threads:200}" oob_thread_pool.keep_alive_time="60000" oob_thread_pool.queue_enabled="false" /> <PING/> <MERGE3 min_interval="10000" max_interval="30000" /> <FD_SOCK/> <FD_ALL timeout="60000" interval="15000" timeout_check_interval="5000" /> <VERIFY_SUSPECT timeout="5000" /> <pbcast.NAKACK2 xmit_interval="1000" xmit_table_num_rows="50" xmit_table_msgs_per_row="1024" xmit_table_max_compaction_time="30000" max_msg_batch_size="100" resend_last_seqno="true" /> <UNICAST3 xmit_interval="500" xmit_table_num_rows="50" xmit_table_msgs_per_row="1024" xmit_table_max_compaction_time="30000" max_msg_batch_size="100" conn_expiry_timeout="0" /> <pbcast.STABLE stability_delay="500" desired_avg_gossip="5000" max_bytes="1M" /> <pbcast.GMS print_local_addr="true" join_timeout="2000" /> <UFC max_credits="2m" min_threshold="0.40" /> <MFC max_credits="2m" min_threshold="0.40" /> <FRAG2/> </config>
バッファサイズを絞ったのと、pbcast.GMSでタイムアウトを短くしてサンプルの動作を軽くしておきました。
Infinispanの設定ファイルは、このようにLocal、Replicated、DistributedなCache、いずれも使えるようにしておきました。
src/test/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:8.2 http://www.infinispan.org/schemas/infinispan-config-8.2.xsd" xmlns="urn:infinispan:config:8.2"> <jgroups> <stack-file name="udp" path="jgroups.xml"/> </jgroups> <cache-container default-cache="localCache"> <jmx duplicate-domains="true"/> <transport cluster="cluster" stack="udp"/> <local-cache name="localCache"/> <replicated-cache name="replicatedCache"/> <distributed-cache name="distributedCache"/> </cache-container> </infinispan>
それでは、これらを使ってCluster Executorを使うコードを書いていきます。
テストコードの雛形
テストコードは、以下の中に書いていきます。
src/test/scala/org/littlewings/infinispan/clusterexecutor/ClusterExecutorSpec.scala
package org.littlewings.infinispan.clusterexecutor import java.util.function.Predicate import org.infinispan.Cache import org.infinispan.manager.{DefaultCacheManager, EmbeddedCacheManager} import org.infinispan.remoting.transport.Address import org.infinispan.util.{SerializableFunction, SerializableRunnable, TriConsumer} import org.jboss.logging.Logger import org.scalatest.{FunSpec, Matchers} class ClusterExecutorSpec extends FunSpec with Matchers with Serializable { describe("Cluster Executor Spec") { // ここに、テストを書く! } protected def withCache[K, V](cacheName: String, numInstances: Int = 1)(f: Cache[K, V] => Unit): Unit = { val managers = (1 to numInstances).map(_ => new DefaultCacheManager("infinispan.xml")) managers.foreach(_.getCache[K, V](cacheName)) try { val cache = managers(0).getCache[K, V](cacheName) f(cache) cache.stop() } finally { managers.foreach(_.getCache[K, V](cacheName).stop()) managers.foreach(_.stop()) } } }
テストといっても、今回はログ出力くらいにしているのですが…。
簡単な、クラスタ構成用のメソッド付きです。
では、ドキュメントを見つつ進めていきます。
2.2. Cluster Executor
ClusterExecutor#execute
まずは、ClusterExecutorを使ってみるところと、executeを呼び出してみます。
Local Cache…というか、単一のNodeでのサンプルは、こちら。
it("local cache, execute") { withCache[String, String]("localCache") { cache => cache.getCacheManager.executor.execute((() => { Logger.getLogger(classOf[ClusterExecutorSpec]).infof("Thread[%s] Execute!!", Thread.currentThread.getName) }): Runnable) } }
EmbeddedCacheManager#executorで、ClusterExecutorを取得することができます。今回は、ClusterManager#executeを使い、Runnableを実行します。
結果のログは、こちら。
3 17, 2016 8:27:59 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$11 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$run$body$1 INFO: Thread[remote-thread--p2-t1] Execute!!
これをクラスタ環境で実行すると(Nodeは3つで、Lambdaをシリアライズできる必要があるのでSerializableRunnableを明示しています)
it("replicated cache, execute") { withCache[String, String]("replicatedCache", 3) { cache => cache.getCacheManager.executor.execute((() => { Logger.getLogger(classOf[ClusterExecutorSpec]).infof("Thread[%s] Execute!!", Thread.currentThread.getName) }): SerializableRunnable) } } it("distributed cache, execute") { withCache[String, String]("distributedCache", 3) { cache => cache.getCacheManager.executor.execute((() => { Logger.getLogger(classOf[ClusterExecutorSpec]).infof("Thread[%s] Execute!!", Thread.currentThread.getName) }): SerializableRunnable) } }
それぞれ、3つのログが出力されます。
## Replicated Cache 3 17, 2016 8:28:04 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$mcV$sp$12 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$run$body$2 INFO: Thread[remote-thread--p9-t2] Execute!! 3 17, 2016 8:28:05 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$mcV$sp$12 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$run$body$2 INFO: Thread[remote-thread--p17-t2] Execute!! 3 17, 2016 8:28:05 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$mcV$sp$12 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$run$body$2 INFO: Thread[remote-thread--p13-t2] Execute!! ## Distributed Cache 3 17, 2016 8:28:09 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcV$sp$13 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$run$body$3 INFO: Thread[remote-thread--p33-t2] Execute!! 3 17, 2016 8:28:09 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcV$sp$13 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$run$body$3 INFO: Thread[remote-thread--p37-t1] Execute!! 3 17, 2016 8:28:09 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcV$sp$13 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$run$body$3 INFO: Thread[remote-thread--p41-t1] Execute!!
クラスタを構成するEmbeddedCacheManagerでは、全Nodeで実行されるようです。スレッド名もログ出力しているのですが、それぞれ別のスレッドですね。
ちなみにClusterManager#executeは、最初にLocal Node、そしてRemote Nodeという順番で実行するようです。
https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/manager/impl/ClusterExecutorImpl.java#L169
https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/manager/impl/ClusterExecutorImpl.java#L179
処理としては、非同期処理になるようですね。CompletableFutureをreturnしないので…。
https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/manager/impl/ClusterExecutorImpl.java#L149
https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/manager/impl/ClusterExecutorImpl.java#L206
ClusterExecutor#filterTargets
先ほどのexecuteでは、クラスタが構成されている場合には全Nodeで処理が実行されましたが、ClusterExecutor#filterTargetsを利用することで、対象のNodeを絞ることができます。
it("distributed cache, filter and execute") { withCache[String, String]("distributedCache", 3) { cache => val self = cache.getAdvancedCache.getRpcManager.getAddress cache .getCacheManager .executor .filterTargets((address => self == address): Predicate[Address]) .execute((() => { Logger.getLogger(classOf[ClusterExecutorSpec]).infof("Thread[%s] Execute!!", Thread.currentThread.getName) }): SerializableRunnable) } }
filterTargetsメソッドでは、クラスタ内の各NodeのAddressが渡ってくるので、こちらを元に実行対象のNodeを決定します。今回のコードでは、Local Nodeのみで実行するようにしました。
3 17, 2016 8:28:12 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$14 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$run$body$4 INFO: Thread[remote-thread--p57-t2] Execute!!
なお、filterTargetsメソッド自体は、execute以外にも後述のsubmit、submitConsumerでも利用できます。
ClusterExecutor#submit
ClusterExecutor#executorと似ていますが、戻り値がCompletableFutureのClusterExecutor#submit。
it("local cache, submit") { withCache[String, String]("localCache") { cache => val completableFuture = cache.getCacheManager.executor.submit((() => { Logger.getLogger(classOf[ClusterExecutorSpec]).infof("Thread[%s] Submit!!", Thread.currentThread.getName) }): Runnable) completableFuture.join() } }
戻り値がCompletableFutureなので、呼び出し元で待ち合わせなどができます。呼び出している処理自体は、ClusterExecutor#submitと同じです。
3 17, 2016 8:28:15 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$5$$anonfun$apply$mcV$sp$15 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$run$body$5 INFO: Thread[remote-thread--p81-t1] Submit!!
処理自体は、Runnableで渡しますし。
クラスタ構成でも、同じですね。
it("replicated cache, submit") { withCache[String, String]("replicatedCache", 3) { cache => val completableFuture = cache.getCacheManager.executor.submit((() => { Logger.getLogger(classOf[ClusterExecutorSpec]).infof("Thread[%s] Submit!!", Thread.currentThread.getName) }): SerializableRunnable) completableFuture.join() } } it("distributed cache, submit") { withCache[String, String]("distributedCache", 3) { cache => val completableFuture = cache.getCacheManager.executor.submit((() => { Logger.getLogger(classOf[ClusterExecutorSpec]).infof("Thread[%s] Submit!!", Thread.currentThread.getName) }): SerializableRunnable) completableFuture.join() } }
ClusterExecutor#submitConsumer
最後は、ClusterExecutor#submitConsumer。こちらは、executeやsubmitと異なり、2つの関数を引数に取ります。それぞれ、FunctionとTriConsumer(Inifinispanが提供)です。
it("local cache, submitConsumer") { withCache[String, String]("localCache") { cache => val completableFuture = cache .getCacheManager .executor .submitConsumer[String]( ((manager: EmbeddedCacheManager) => { Logger .getLogger(classOf[ClusterExecutorSpec]) .infof("Thread[%s] callable!!", Thread.currentThread.getName) manager.getAddress.toString }): java.util.function.Function[_ >: EmbeddedCacheManager, _ <: String], ((address: Address, value: String, thrown: Throwable) => { Logger .getLogger(classOf[ClusterExecutorSpec]) .infof("Thread[%s] triConsumer!!, arg Address[%s], value Address[%s]", Array(Thread.currentThread.getName, address, value): _*) }): TriConsumer[_ >: Address, _ >: String, _ >: Throwable] ) completableFuture.join() } }
戻り値はやはりvoidですが、最初のFunctionでEmbeddedCacheManagerのインスタンスが渡ってきます。このFunctionでは、シリアライズ可能な任意の値を返すことができます。このFunctionは、クラスタ内の各Nodeで実行されます。今回は、実行しているAddress(Node)の文字列表現を返すことにしました。
次のTriConsumerは引数を3つ取る関数で、それぞれFunctionが実行されたAddress(Node)、Functionが返した値、例外が発生した場合はその原因が渡されます。
※詳細は、また後で
結果はCompletableFutureですが、実行結果自体はvoidとなります。
Local Cacheではこういうログになりますが
3 17, 2016 8:48:45 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$8$$anonfun$apply$mcV$sp$18 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$apply$body$1 INFO: Thread[remote-thread--p136-t1] callable!! 3 17, 2016 8:48:45 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$8$$anonfun$apply$mcV$sp$18 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$accept$body$1 INFO: Thread[ScalaTest-run-running-ClusterExecutorSpec] triConsumer!!, arg Address[xxxxx-17452], value Address[xxxxx-17452]
クラスタ構成では(Functionとして渡す箇所については、Serializableである必要があります)、
it("replicated cache, submitConsumer") { withCache[String, String]("replicatedCache", 3) { cache => val completableFuture = cache .getCacheManager .executor .submitConsumer[String]( ((manager: EmbeddedCacheManager) => { Logger .getLogger(classOf[ClusterExecutorSpec]) .infof("Thread[%s] callable!!", Thread.currentThread.getName) manager.getAddress.toString }): SerializableFunction[_ >: EmbeddedCacheManager, _ <: String], ((address: Address, value: String, thrown: Throwable) => { Logger .getLogger(classOf[ClusterExecutorSpec]) .infof("Thread[%s] triConsumer!!, arg Address[%s], value Address[%s]", Array(Thread.currentThread.getName, address, value): _*) }): TriConsumer[_ >: Address, _ >: String, _ >: Throwable] ) completableFuture.join() } } it("distributed cache, submitConsumer") { withCache[String, String]("distributedCache", 3) { cache => val completableFuture = cache .getCacheManager .executor .submitConsumer[String]( ((manager: EmbeddedCacheManager) => { Logger .getLogger(classOf[ClusterExecutorSpec]) .infof("Thread[%s] callable!!", Thread.currentThread.getName) manager.getAddress.toString }): SerializableFunction[_ >: EmbeddedCacheManager, _ <: String], ((address: Address, value: String, thrown: Throwable) => { Logger .getLogger(classOf[ClusterExecutorSpec]) .infof("Thread[%s] triConsumer!!, arg Address[%s], value Address[%s]", Array(Thread.currentThread.getName, address, value): _*) }): TriConsumer[_ >: Address, _ >: String, _ >: Throwable] ) completableFuture.join() } }
こういう結果になります。
## Replicated Cache 3 17, 2016 8:48:48 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$9$$anonfun$apply$mcV$sp$19 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$apply$body$2 INFO: Thread[remote-thread--p143-t1] callable!! 3 17, 2016 8:48:48 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$9$$anonfun$apply$mcV$sp$19 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$accept$body$2 INFO: Thread[remote-thread--p143-t1] triConsumer!!, arg Address[xxxxx-43208], value Address[xxxxx-43208] 3 17, 2016 8:48:48 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$9$$anonfun$apply$mcV$sp$19 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$apply$body$2 INFO: Thread[remote-thread--p147-t2] callable!! 3 17, 2016 8:48:48 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$9$$anonfun$apply$mcV$sp$19 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$apply$body$2 INFO: Thread[remote-thread--p151-t1] callable!! 3 17, 2016 8:48:48 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$9$$anonfun$apply$mcV$sp$19 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$accept$body$2 INFO: Thread[OOB-17,xxxxx-43208] triConsumer!!, arg Address[xxxxx-15247], value Address[xxxxx-15247] 3 17, 2016 8:48:48 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$9$$anonfun$apply$mcV$sp$19 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$accept$body$2 INFO: Thread[OOB-18,xxxxx-43208] triConsumer!!, arg Address[xxxxx-48219], value Address[xxxxx-48219] ## Distributed Cache 3 17, 2016 8:48:50 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$10$$anonfun$apply$mcV$sp$20 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$apply$body$3 INFO: Thread[remote-thread--p167-t2] callable!! 3 17, 2016 8:48:50 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$10$$anonfun$apply$mcV$sp$20 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$accept$body$3 INFO: Thread[remote-thread--p167-t2] triConsumer!!, arg Address[xxxxx-41623], value Address[xxxxx-41623] 3 17, 2016 8:48:50 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$10$$anonfun$apply$mcV$sp$20 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$apply$body$3 INFO: Thread[remote-thread--p171-t1] callable!! 3 17, 2016 8:48:50 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$10$$anonfun$apply$mcV$sp$20 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$apply$body$3 INFO: Thread[remote-thread--p175-t2] callable!! 3 17, 2016 8:48:50 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$10$$anonfun$apply$mcV$sp$20 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$accept$body$3 INFO: Thread[OOB-19,xxxxx-41623] triConsumer!!, arg Address[xxxxx-8835], value Address[xxxxx-8835] 3 17, 2016 8:48:50 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$10$$anonfun$apply$mcV$sp$20 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$accept$body$3 INFO: Thread[OOB-20,xxxxx-41623] triConsumer!!, arg Address[xxxxx-44991], value Address[xxxxx-44991]
わかりにくいですが、ログをよーく見るとスレッド名から、TriConsumerは呼び出し元のNodeで実行されているようです。
コードでも、確認してみましょう。
先ほどのexecute、submitとは実行されるメソッドは異なりますが、最初にLocal Node上で処理が行われます。
https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/manager/impl/ClusterExecutorImpl.java#L234
続いて、Remote Nodeで実行。
https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/manager/impl/ClusterExecutorImpl.java#L254-L255
引数で渡されたFunction、もしくはSerializableFunctionは、各Nodeで分散実行されます。この引数にEmbeddedCacheManagerが入っているのは、EmbeddedCacheManagerがSerializableではないからです。で、このFunctionで処理された結果を、後続のTriConsumerに渡します。
TriConsumerは、呼び出し元のNodeで実行されます。このため、TriConsumerはSerializableではありません。
https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/manager/impl/ClusterExecutorImpl.java#L237-L241
https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/manager/impl/ClusterExecutorImpl.java#L257-L281
どのNodeで実行されたのかを判定できるように、Addressを渡しているんでしょうね。
で、最後にまとめてCompletableFutureとして返す、と。
https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/manager/impl/ClusterExecutorImpl.java#L285
また、ドキュメントにも記載がありますが、TriConsumerの第2引数と第3引数は、どちらかが設定されており、その逆はnullが渡されます。ソースコード上も、そのような感じになっていますね。何かエラーがあった場合は、第3引数にその例外情報が渡されるようです。
まとめ
Infinispan 8.2.0で追加された、Cluster Executorを試してみました。
とりあえず挙動はわかったのですが、全体的に戻り値はvoidなんですねー。各Nodeで、何かしら処理を実行するだけという感じだと。
戻り値が必要な処理だったりした場合は、Distributed Executor Frameworkか、Distributed Streamsを使おうという話なのでしょうか?
今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-cluster-executor