CLOVER🍀

That was when it all began.

InfinispanのCluster Executorを試す

先日、Infinispan 8.2.0.Finalがリリースされました。

Infinispan: Infinispan 8.2.0.Final is out!

追加された新機能のうち、今回はCluster Executorを試してみます。

2.2. Cluster Executor

Cluster Executorとは、クラスタ内の全Node、もしくは指定したNodeで処理を実行するための機能です。すでに似た機能としてDistributed Executor Frameworkがありますが、こちらと異なるのは実行するのにCacheが不要なこと(EmbeddedCacheManagerがあれば実行可能)なことです。

渡す処理は、シリアライズ可能である必要がありますが、これらをLambdaで簡単に書けるようにするために、org.infinispan.utilパッケージにSerializableなRunnableやFunctionが追加されています。

https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/util/SerializableRunnable.java
https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/util/SerializableCallable.java
https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/util/SerializableFunction.java

これらができたことと、利用する箇所の引数の型として定義されたことによってSerializableにするための交差型キャストがいらなくなったみたいです(Javaでは…)。

では、ちょっと使ってみましょう。

追記
書いた後で思いましたが、Cacheの種類を含めて書いていますけど、あんまり意味なかったですね…。そもそも、「Cacheが要らない」って書いてるのに。

準備

ビルド定義。
build.sbt

name := "embedded-cluster-executor"

version := "0.0.1-SNAPSHOT"

organization := "org.littlewings"

scalaVersion := "2.11.8"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature", "-Xexperimental")

updateOptions := updateOptions.value.withCachedResolution(true)

parallelExecution in Test := false

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-core" % "8.2.0.Final",
  "net.jcip" % "jcip-annotations" % "1.0" % "provided",
  "org.scalatest" %% "scalatest" % "2.2.6" % "test"
)

今回は、SAMのサポートが欲しかったので、コンパイルオプションに「-Xexperimental」を入れました。

JGroupsの設定は、以下のとおり。
src/test/resources/jgroups.xml

<?xml version="1.0" encoding="UTF-8"?>
<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.6.xsd">
    <UDP mcast_addr="${jgroups.udp.mcast_addr:228.6.7.8}"
         mcast_port="${jgroups.udp.mcast_port:46655}"
         ucast_send_buf_size="130k"
         mcast_send_buf_size="130k"
         ucast_recv_buf_size="150k"
         mcast_recv_buf_size="200k"
         ip_ttl="${jgroups.ip_ttl:2}"
         thread_naming_pattern="pl"
         enable_diagnostics="false"
         bundler_type="sender-sends-with-timer"

         thread_pool.min_threads="${jgroups.thread_pool.min_threads:2}"
         thread_pool.max_threads="${jgroups.thread_pool.max_threads:30}"
         thread_pool.keep_alive_time="60000"
         thread_pool.queue_enabled="false"

         internal_thread_pool.min_threads="${jgroups.internal_thread_pool.min_threads:5}"
         internal_thread_pool.max_threads="${jgroups.internal_thread_pool.max_threads:20}"
         internal_thread_pool.keep_alive_time="60000"
         internal_thread_pool.queue_enabled="true"
         internal_thread_pool.queue_max_size="500"

         oob_thread_pool.min_threads="${jgroups.oob_thread_pool.min_threads:20}"
         oob_thread_pool.max_threads="${jgroups.oob_thread_pool.max_threads:200}"
         oob_thread_pool.keep_alive_time="60000"
         oob_thread_pool.queue_enabled="false"
    />
    <PING/>
    <MERGE3 min_interval="10000"
            max_interval="30000"
    />
    <FD_SOCK/>
    <FD_ALL timeout="60000"
            interval="15000"
            timeout_check_interval="5000"
    />
    <VERIFY_SUSPECT timeout="5000"
    />
    <pbcast.NAKACK2 xmit_interval="1000"
                    xmit_table_num_rows="50"
                    xmit_table_msgs_per_row="1024"
                    xmit_table_max_compaction_time="30000"
                    max_msg_batch_size="100"
                    resend_last_seqno="true"
    />
    <UNICAST3 xmit_interval="500"
              xmit_table_num_rows="50"
              xmit_table_msgs_per_row="1024"
              xmit_table_max_compaction_time="30000"
              max_msg_batch_size="100"
              conn_expiry_timeout="0"
    />
    <pbcast.STABLE stability_delay="500"
                   desired_avg_gossip="5000"
                   max_bytes="1M"
    />
    <pbcast.GMS print_local_addr="true"
                join_timeout="2000"
    />
    <UFC max_credits="2m"
         min_threshold="0.40"
    />
    <MFC max_credits="2m"
         min_threshold="0.40"
    />
    <FRAG2/>
</config>

バッファサイズを絞ったのと、pbcast.GMSタイムアウトを短くしてサンプルの動作を軽くしておきました。

Infinispanの設定ファイルは、このようにLocal、Replicated、DistributedなCache、いずれも使えるようにしておきました。
src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:8.2 http://www.infinispan.org/schemas/infinispan-config-8.2.xsd"
        xmlns="urn:infinispan:config:8.2">
    <jgroups>
        <stack-file name="udp" path="jgroups.xml"/>
    </jgroups>

    <cache-container default-cache="localCache">
        <jmx duplicate-domains="true"/>
        <transport cluster="cluster" stack="udp"/>
        <local-cache name="localCache"/>
        <replicated-cache name="replicatedCache"/>
        <distributed-cache name="distributedCache"/>
    </cache-container>
</infinispan>

それでは、これらを使ってCluster Executorを使うコードを書いていきます。

テストコードの雛形

テストコードは、以下の中に書いていきます。
src/test/scala/org/littlewings/infinispan/clusterexecutor/ClusterExecutorSpec.scala

package org.littlewings.infinispan.clusterexecutor

import java.util.function.Predicate

import org.infinispan.Cache
import org.infinispan.manager.{DefaultCacheManager, EmbeddedCacheManager}
import org.infinispan.remoting.transport.Address
import org.infinispan.util.{SerializableFunction, SerializableRunnable, TriConsumer}
import org.jboss.logging.Logger
import org.scalatest.{FunSpec, Matchers}

class ClusterExecutorSpec extends FunSpec with Matchers with Serializable {
  describe("Cluster Executor Spec") {
    // ここに、テストを書く!

  }

  protected def withCache[K, V](cacheName: String, numInstances: Int = 1)(f: Cache[K, V] => Unit): Unit = {
    val managers = (1 to numInstances).map(_ => new DefaultCacheManager("infinispan.xml"))

    managers.foreach(_.getCache[K, V](cacheName))

    try {
      val cache = managers(0).getCache[K, V](cacheName)
      f(cache)
      cache.stop()
    } finally {
      managers.foreach(_.getCache[K, V](cacheName).stop())
      managers.foreach(_.stop())
    }
  }
}

テストといっても、今回はログ出力くらいにしているのですが…。

簡単な、クラスタ構成用のメソッド付きです。

では、ドキュメントを見つつ進めていきます。
2.2. Cluster Executor

ClusterExecutor#execute

まずは、ClusterExecutorを使ってみるところと、executeを呼び出してみます。

Local Cache…というか、単一のNodeでのサンプルは、こちら。

    it("local cache, execute") {
      withCache[String, String]("localCache") { cache =>
        cache.getCacheManager.executor.execute((() => {
          Logger.getLogger(classOf[ClusterExecutorSpec]).infof("Thread[%s] Execute!!", Thread.currentThread.getName)
        }): Runnable)
      }
    }

EmbeddedCacheManager#executorで、ClusterExecutorを取得することができます。今回は、ClusterManager#executeを使い、Runnableを実行します。

結果のログは、こちら。

3 17, 2016 8:27:59 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$11 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$run$body$1
INFO: Thread[remote-thread--p2-t1] Execute!!

これをクラスタ環境で実行すると(Nodeは3つで、Lambdaをシリアライズできる必要があるのでSerializableRunnableを明示しています)

    it("replicated cache, execute") {
      withCache[String, String]("replicatedCache", 3) { cache =>
        cache.getCacheManager.executor.execute((() => {
          Logger.getLogger(classOf[ClusterExecutorSpec]).infof("Thread[%s] Execute!!", Thread.currentThread.getName)
        }): SerializableRunnable)
      }
    }

    it("distributed cache, execute") {
      withCache[String, String]("distributedCache", 3) { cache =>
        cache.getCacheManager.executor.execute((() => {
          Logger.getLogger(classOf[ClusterExecutorSpec]).infof("Thread[%s] Execute!!", Thread.currentThread.getName)
        }): SerializableRunnable)
      }
    }

それぞれ、3つのログが出力されます。

## Replicated Cache
3 17, 2016 8:28:04 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$mcV$sp$12 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$run$body$2
INFO: Thread[remote-thread--p9-t2] Execute!!
3 17, 2016 8:28:05 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$mcV$sp$12 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$run$body$2
INFO: Thread[remote-thread--p17-t2] Execute!!
3 17, 2016 8:28:05 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$mcV$sp$12 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$run$body$2
INFO: Thread[remote-thread--p13-t2] Execute!!

## Distributed Cache
3 17, 2016 8:28:09 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcV$sp$13 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$run$body$3
INFO: Thread[remote-thread--p33-t2] Execute!!
3 17, 2016 8:28:09 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcV$sp$13 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$run$body$3
INFO: Thread[remote-thread--p37-t1] Execute!!
3 17, 2016 8:28:09 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcV$sp$13 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$run$body$3
INFO: Thread[remote-thread--p41-t1] Execute!!

クラスタを構成するEmbeddedCacheManagerでは、全Nodeで実行されるようです。スレッド名もログ出力しているのですが、それぞれ別のスレッドですね。

ちなみにClusterManager#executeは、最初にLocal Node、そしてRemote Nodeという順番で実行するようです。
https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/manager/impl/ClusterExecutorImpl.java#L169
https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/manager/impl/ClusterExecutorImpl.java#L179

処理としては、非同期処理になるようですね。CompletableFutureをreturnしないので…。
https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/manager/impl/ClusterExecutorImpl.java#L149
https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/manager/impl/ClusterExecutorImpl.java#L206

ClusterExecutor#filterTargets

先ほどのexecuteでは、クラスタが構成されている場合には全Nodeで処理が実行されましたが、ClusterExecutor#filterTargetsを利用することで、対象のNodeを絞ることができます。

    it("distributed cache, filter and execute") {
      withCache[String, String]("distributedCache", 3) { cache =>
        val self = cache.getAdvancedCache.getRpcManager.getAddress

        cache
          .getCacheManager
          .executor
          .filterTargets((address => self == address): Predicate[Address])
          .execute((() => {
            Logger.getLogger(classOf[ClusterExecutorSpec]).infof("Thread[%s] Execute!!", Thread.currentThread.getName)
          }): SerializableRunnable)
      }
    }

filterTargetsメソッドでは、クラスタ内の各NodeのAddressが渡ってくるので、こちらを元に実行対象のNodeを決定します。今回のコードでは、Local Nodeのみで実行するようにしました。

3 17, 2016 8:28:12 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$14 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$run$body$4
INFO: Thread[remote-thread--p57-t2] Execute!!

なお、filterTargetsメソッド自体は、execute以外にも後述のsubmit、submitConsumerでも利用できます。

ClusterExecutor#submit

ClusterExecutor#executorと似ていますが、戻り値がCompletableFutureのClusterExecutor#submit。

    it("local cache, submit") {
      withCache[String, String]("localCache") { cache =>
        val completableFuture =
          cache.getCacheManager.executor.submit((() => {
            Logger.getLogger(classOf[ClusterExecutorSpec]).infof("Thread[%s] Submit!!", Thread.currentThread.getName)
          }): Runnable)

        completableFuture.join()
      }
    }

戻り値がCompletableFutureなので、呼び出し元で待ち合わせなどができます。呼び出している処理自体は、ClusterExecutor#submitと同じです。

3 17, 2016 8:28:15 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$5$$anonfun$apply$mcV$sp$15 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$run$body$5
INFO: Thread[remote-thread--p81-t1] Submit!!

処理自体は、Runnableで渡しますし。

クラスタ構成でも、同じですね。

    it("replicated cache, submit") {
      withCache[String, String]("replicatedCache", 3) { cache =>
        val completableFuture =
          cache.getCacheManager.executor.submit((() => {
            Logger.getLogger(classOf[ClusterExecutorSpec]).infof("Thread[%s] Submit!!", Thread.currentThread.getName)
          }): SerializableRunnable)

        completableFuture.join()
      }
    }

    it("distributed cache, submit") {
      withCache[String, String]("distributedCache", 3) { cache =>
        val completableFuture =
          cache.getCacheManager.executor.submit((() => {
            Logger.getLogger(classOf[ClusterExecutorSpec]).infof("Thread[%s] Submit!!", Thread.currentThread.getName)
          }): SerializableRunnable)

        completableFuture.join()
      }
    }

ClusterExecutor#submitConsumer

最後は、ClusterExecutor#submitConsumer。こちらは、executeやsubmitと異なり、2つの関数を引数に取ります。それぞれ、FunctionとTriConsumer(Inifinispanが提供)です。

    it("local cache, submitConsumer") {
      withCache[String, String]("localCache") { cache =>
        val completableFuture =
          cache
            .getCacheManager
            .executor
            .submitConsumer[String](
            ((manager: EmbeddedCacheManager) => {
              Logger
                .getLogger(classOf[ClusterExecutorSpec])
                .infof("Thread[%s] callable!!", Thread.currentThread.getName)
              manager.getAddress.toString
            }): java.util.function.Function[_ >: EmbeddedCacheManager, _ <: String],
            ((address: Address, value: String, thrown: Throwable) => {
              Logger
                .getLogger(classOf[ClusterExecutorSpec])
                .infof("Thread[%s] triConsumer!!, arg Address[%s], value Address[%s]",
                  Array(Thread.currentThread.getName, address, value): _*)
            }): TriConsumer[_ >: Address, _ >: String, _ >: Throwable]
          )

        completableFuture.join()
      }
    }

戻り値はやはりvoidですが、最初のFunctionでEmbeddedCacheManagerのインスタンスが渡ってきます。このFunctionでは、シリアライズ可能な任意の値を返すことができます。このFunctionは、クラスタ内の各Nodeで実行されます。今回は、実行しているAddress(Node)の文字列表現を返すことにしました。

次のTriConsumerは引数を3つ取る関数で、それぞれFunctionが実行されたAddress(Node)、Functionが返した値、例外が発生した場合はその原因が渡されます。
※詳細は、また後で

結果はCompletableFutureですが、実行結果自体はvoidとなります。

Local Cacheではこういうログになりますが

3 17, 2016 8:48:45 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$8$$anonfun$apply$mcV$sp$18 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$apply$body$1
INFO: Thread[remote-thread--p136-t1] callable!!
3 17, 2016 8:48:45 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$8$$anonfun$apply$mcV$sp$18 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$accept$body$1
INFO: Thread[ScalaTest-run-running-ClusterExecutorSpec] triConsumer!!, arg Address[xxxxx-17452], value Address[xxxxx-17452]

クラスタ構成では(Functionとして渡す箇所については、Serializableである必要があります)、

    it("replicated cache, submitConsumer") {
      withCache[String, String]("replicatedCache", 3) { cache =>
        val completableFuture =
          cache
            .getCacheManager
            .executor
            .submitConsumer[String](
            ((manager: EmbeddedCacheManager) => {
              Logger
                .getLogger(classOf[ClusterExecutorSpec])
                .infof("Thread[%s] callable!!", Thread.currentThread.getName)
              manager.getAddress.toString
            }): SerializableFunction[_ >: EmbeddedCacheManager, _ <: String],
            ((address: Address, value: String, thrown: Throwable) => {
              Logger
                .getLogger(classOf[ClusterExecutorSpec])
                .infof("Thread[%s] triConsumer!!, arg Address[%s], value Address[%s]",
                  Array(Thread.currentThread.getName, address, value): _*)
            }): TriConsumer[_ >: Address, _ >: String, _ >: Throwable]
          )

        completableFuture.join()
      }
    }

    it("distributed cache, submitConsumer") {
      withCache[String, String]("distributedCache", 3) { cache =>
        val completableFuture =
          cache
            .getCacheManager
            .executor
            .submitConsumer[String](
            ((manager: EmbeddedCacheManager) => {
              Logger
                .getLogger(classOf[ClusterExecutorSpec])
                .infof("Thread[%s] callable!!", Thread.currentThread.getName)
              manager.getAddress.toString
            }): SerializableFunction[_ >: EmbeddedCacheManager, _ <: String],
            ((address: Address, value: String, thrown: Throwable) => {
              Logger
                .getLogger(classOf[ClusterExecutorSpec])
                .infof("Thread[%s] triConsumer!!, arg Address[%s], value Address[%s]",
                  Array(Thread.currentThread.getName, address, value): _*)
            }): TriConsumer[_ >: Address, _ >: String, _ >: Throwable]
          )

        completableFuture.join()
      }
    }

こういう結果になります。

## Replicated Cache
3 17, 2016 8:48:48 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$9$$anonfun$apply$mcV$sp$19 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$apply$body$2
INFO: Thread[remote-thread--p143-t1] callable!!
3 17, 2016 8:48:48 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$9$$anonfun$apply$mcV$sp$19 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$accept$body$2
INFO: Thread[remote-thread--p143-t1] triConsumer!!, arg Address[xxxxx-43208], value Address[xxxxx-43208]
3 17, 2016 8:48:48 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$9$$anonfun$apply$mcV$sp$19 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$apply$body$2
INFO: Thread[remote-thread--p147-t2] callable!!
3 17, 2016 8:48:48 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$9$$anonfun$apply$mcV$sp$19 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$apply$body$2
INFO: Thread[remote-thread--p151-t1] callable!!
3 17, 2016 8:48:48 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$9$$anonfun$apply$mcV$sp$19 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$accept$body$2
INFO: Thread[OOB-17,xxxxx-43208] triConsumer!!, arg Address[xxxxx-15247], value Address[xxxxx-15247]
3 17, 2016 8:48:48 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$9$$anonfun$apply$mcV$sp$19 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$accept$body$2
INFO: Thread[OOB-18,xxxxx-43208] triConsumer!!, arg Address[xxxxx-48219], value Address[xxxxx-48219]

## Distributed Cache
3 17, 2016 8:48:50 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$10$$anonfun$apply$mcV$sp$20 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$apply$body$3
INFO: Thread[remote-thread--p167-t2] callable!!
3 17, 2016 8:48:50 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$10$$anonfun$apply$mcV$sp$20 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$accept$body$3
INFO: Thread[remote-thread--p167-t2] triConsumer!!, arg Address[xxxxx-41623], value Address[xxxxx-41623]
3 17, 2016 8:48:50 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$10$$anonfun$apply$mcV$sp$20 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$apply$body$3
INFO: Thread[remote-thread--p171-t1] callable!!
3 17, 2016 8:48:50 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$10$$anonfun$apply$mcV$sp$20 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$apply$body$3
INFO: Thread[remote-thread--p175-t2] callable!!
3 17, 2016 8:48:50 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$10$$anonfun$apply$mcV$sp$20 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$accept$body$3
INFO: Thread[OOB-19,xxxxx-41623] triConsumer!!, arg Address[xxxxx-8835], value Address[xxxxx-8835]
3 17, 2016 8:48:50 午後 org.littlewings.infinispan.clusterexecutor.ClusterExecutorSpec$$anonfun$1$$anonfun$apply$mcV$sp$10$$anonfun$apply$mcV$sp$20 org$littlewings$infinispan$clusterexecutor$ClusterExecutorSpec$$anonfun$$anonfun$$anonfun$$accept$body$3
INFO: Thread[OOB-20,xxxxx-41623] triConsumer!!, arg Address[xxxxx-44991], value Address[xxxxx-44991]

わかりにくいですが、ログをよーく見るとスレッド名から、TriConsumerは呼び出し元のNodeで実行されているようです。

コードでも、確認してみましょう。

先ほどのexecute、submitとは実行されるメソッドは異なりますが、最初にLocal Node上で処理が行われます。
https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/manager/impl/ClusterExecutorImpl.java#L234

続いて、Remote Nodeで実行。
https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/manager/impl/ClusterExecutorImpl.java#L254-L255

引数で渡されたFunction、もしくはSerializableFunctionは、各Nodeで分散実行されます。この引数にEmbeddedCacheManagerが入っているのは、EmbeddedCacheManagerがSerializableではないからです。で、このFunctionで処理された結果を、後続のTriConsumerに渡します。

TriConsumerは、呼び出し元のNodeで実行されます。このため、TriConsumerはSerializableではありません。
https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/manager/impl/ClusterExecutorImpl.java#L237-L241
https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/manager/impl/ClusterExecutorImpl.java#L257-L281

どのNodeで実行されたのかを判定できるように、Addressを渡しているんでしょうね。

で、最後にまとめてCompletableFutureとして返す、と。
https://github.com/infinispan/infinispan/blob/8.2.0.Final/core/src/main/java/org/infinispan/manager/impl/ClusterExecutorImpl.java#L285

また、ドキュメントにも記載がありますが、TriConsumerの第2引数と第3引数は、どちらかが設定されており、その逆はnullが渡されます。ソースコード上も、そのような感じになっていますね。何かエラーがあった場合は、第3引数にその例外情報が渡されるようです。

まとめ

Infinispan 8.2.0で追加された、Cluster Executorを試してみました。

とりあえず挙動はわかったのですが、全体的に戻り値はvoidなんですねー。各Nodeで、何かしら処理を実行するだけという感じだと。

戻り値が必要な処理だったりした場合は、Distributed Executor Frameworkか、Distributed Streamsを使おうという話なのでしょうか?

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-cluster-executor