CLOVER🍀

That was when it all began.

Infinispan Distributed Execution Framework

ホントは、先週書こうとしていたエントリなのですが、InfinispanのDistributed Execution Frameworkを触ってみました。

Distributed Execution Framework
https://docs.jboss.org/author/display/ISPN/Infinispan+Distributed+Execution+Framework

ドキュメントを見た時は、最初意味がわからなかったのですが…。

Distributed Execution Frameworkというのは、Infinispan上で分散処理を実行するためのフレームワークです。って、名前のままですね。

ばくっというと、以下のような感じになっています。

  • java.util.concurrent.Callable、ExecutorService、Futureを拡張して利用
  • クラスタ環境で動作している各Infinispanのインスタンスに対して、Callableのインスタンスを転送することができる
  • 結果は、ExecutorServiceに処理を依頼した側に、Futureとして返される
  • Callableは、Infinispanが拡張したインターフェースを使用することで、転送先のInfinispanに合わせたKeyのセット、そしてCacheを使用することができる

たとえば、

  Infinispan[Embedded]
          |
  Infinispan[Embedded]
          |
  Infinispan[Embedded]

みたいにInfinispanが並んでいたとして、どれかひとつに対してExecutorServiceに対して処理を依頼すると

  Infinispan[Embedded] <= ExecutorService#submitEverywhere(Callable)
          |
  Infinispan[Embedded]
          |
  Infinispan[Embedded]

インスタンスにCallableが転送されて、そこで処理が実行されます。

  Infinispan[Embedded] Callable#call
          |
  Infinispan[Embedded] Callable#call
          |
  Infinispan[Embedded] Callable#call

で、結果は元のインスタンスに対して、FutureのListとして戻ります。

  Infinispan[Embedded] => List<Future>
          |
  Infinispan[Embedded]
          |
  Infinispan[Embedded]

そんな仕組み。
*単一のインスタンスで実行してFutureが返る、submitも使えます

とりあえず、こんな環境準備をしました。
build.sbt

name := "infinispan-distributed-execution-framework-example"

version := "0.0.1"

scalaVersion := "2.10.1"

organization := "littlewings"

fork in run := true

scalacOptions += "-deprecation"

resolvers += "JBoss Public Maven Repository Group" at "http://repository.jboss.org/nexus/content/groups/public-jboss/"

libraryDependencies += "org.infinispan" % "infinispan-core" % "5.2.1.Final"

Scalaをしれっと2.10.1に。Infinispanは、Mavenリポジトリを見る限り3/12に5.2.5.Finalがデプロイされているようなのですが、オフィシャルサイトにはアナウンスが出てないのでいったん5.2.1.Finalのままで。

最終的に、クラスタキャッシュも使用するので、設定ファイルも用意しておきました。
src/main/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:5.2 http://www.infinispan.org/schemas/infinispan-config-5.2.xsd"
    xmlns="urn:infinispan:config:5.2">
   
  <global>
    <transport clusterName="distributionFrameworkCluster">
      <properties>
        <property name="configurationFile" value="jgroups.xml" />
      </properties>
    </transport>
    <globalJmxStatistics enabled="true"/>
  </global>

  <default>
    <jmxStatistics enabled="true"/>
    <clustering mode="distribution">
      <hash numOwners="1" />
      <sync />
    </clustering>
  </default>
</infinispan>

分散キャッシュを使用するので、クラスタのモードは「distribution」に、それからnumOwnersは1に絞り込んでいます。

src/main/resources/jgroups.xml

<?xml version="1.0" encoding="UTF-8"?>
<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.2.xsd">
  <UDP
      mcast_addr="${jgroups.udp.mcast_addr:228.11.11.11}"
      mcast_port="${jgroups.udp.mcast_port:45688}"
      tos="8"
      ucast_recv_buf_size="130000"
      ucast_send_buf_size="100000"
      mcast_recv_buf_size="130000"
      mcast_send_buf_size="100000"
      loopback="true"
      max_bundle_size="64000"
      max_bundle_timeout="30"
      ip_ttl="${jgroups.udp.ip_ttl:2}"
      enable_bundling="true"
      enable_unicast_bundling="true"
      enable_diagnostics="true"
      diagnostics_addr="${jboss.jgroups.diagnostics_addr:224.0.0.75}"
      diagnostics_port="${jboss.jgroups.diagnostics_port:7500}"

      thread_naming_pattern="hr-cl"

      thread_pool.enabled="true"
      thread_pool.min_threads="2"
      thread_pool.max_threads="8"
      thread_pool.keep_alive_time="5000"
      thread_pool.queue_enabled="true"
      thread_pool.queue_max_size="1000"
      thread_pool.rejection_policy="discard"

      oob_thread_pool.enabled="true"
      oob_thread_pool.min_threads="2"
      oob_thread_pool.max_threads="8"
      oob_thread_pool.keep_alive_time="1000"
      oob_thread_pool.queue_enabled="false"
      oob_thread_pool.rejection_policy="discard"
      />

  <PING timeout="3000" num_initial_members="2" />

  <!-- <FD timeout="3000" max_tries="5" /> -->
  <FD_ALL interval="3000" timeout="10000" />
  <FD_SOCK />
  <VERIFY_SUSPECT timeout="1500" />

  <UNICAST2 />
  <pbcast.NAKACK2
      use_mcast_xmit="false" 
      xmit_interval="1000"
      discard_delivered_msgs="true"
      />

  <pbcast.GMS
      print_local_addr="true"
      join_timeout="3000"
      leave_timeout="3000"
      merge_timeout="3000"
      max_bundling_time="200"
      view_bundling="true"
      />

  <UFC max_credits="500000" min_threshold="0.20" />
  <MFC max_credits="500000" min_threshold="0.20" />

  <MERGE3 max_interval="30000" min_interval="10000" />

  <pbcast.STABLE
      stability_delay="1000"
      desired_avg_gossip="50000"
      max_bytes="400000" />

  <FRAG2 frag_size="60000" />
</config>

DefaultExecutorServiceとCallableを使用する

DefaultExecutorService#submit

1番基本的なパターンです。まずは、Callableの実装として以下のようなものを用意します。
src/main/scala/MyCallable.scala

import java.util.concurrent.Callable

class MyCallable extends Callable[Integer] {
  println("create mycallable instance")

  def call(): Integer = {
    println("called")
    10
  }
}

すごく単純な、Callableの実装ですね…。

利用する側として、こんなのを用意しました。
src/main/scala/DistributedExecutionFrameworkExample.scala

import scala.collection.JavaConverters._

import java.util.Set

import org.infinispan.Cache
import org.infinispan.manager.DefaultCacheManager
import org.infinispan.distexec.DefaultExecutorService

object DistributedExecutionFrameworkExample {
  def main(args: Array[String]): Unit = {
    val manager = new DefaultCacheManager("infinispan.xml")
    val cache = manager.getCache[String, Integer]()

    val des = new DefaultExecutorService(cache)
    try {
      val future = des.submit(new MyCallable)
      println(s"ExecutorService#submit Result => ${future.get}")
    } finally {
      des.shutdown()

      cache.stop()
      manager.stop()
    }
  }
}

ExecutorServiceは、Infinispanが提供しているDefaultExecutorServiceクラスを使用します。インスタンス生成の際には、InfinispanのCacheが必要です。

    val des = new DefaultExecutorService(cache)

あとは、submitすればFutureが返ってきます。

      val future = des.submit(new MyCallable)
      println(s"ExecutorService#submit Result => ${future.get}")

なお、DefaultExecutorServiceクラスのコンストラクタの引数は、BasicCacheではなくCacheなので、Hot Rodでは使用できないと思われます。

では、実行してみます。

> run-main DistributedExecutionFrameworkExample
[info] Running DistributedExecutionFrameworkExample 
  〜省略〜
[info] create mycallable instance
[info] called
[info] ExecutorService#submit Result => 10
  〜省略〜
[success] Total time: 6 s, completed 2013/03/16 16:38:15

まあ、これだけだと普通のjava.util.concurrent.ExecutorServiceを使った場合となんら変わりがありません。

DefaultExecutorService#submitEverywhere

ここで、もうひとつInfinispanのインスタンスを追加します。
src/main/scala/EmbeddedCacheServer.scala

import org.infinispan.manager.DefaultCacheManager

object EmbeddedCacheServer {
  def main(args: Array[String]): Unit = {
    val manager = new DefaultCacheManager("infinispan.xml")
    val cache = manager.getCache[String, String]()
    cache.addListener(new LoggingListener)
  }
}

Cache#stopとDefaultCacheManager#stopを呼ばないとJavaVMが終了しないので、それを利用してちょっと浮いててもらいます。あと、LoggingListenerというクラスはまた後で説明します。

以降、最初に使っていたDistributedExecutionFrameworkExample内で動作しているInfinispanをインスタンス-A、EmbeddedCacheServerで浮かせているInfinspanをインスタンス-Bと表記します。

では、インスタンス-Bを起動。

$ sbt "run-main EmbeddedCacheServer"
[info] Set current project to infinispan-distributed-execution-framework-example (in build file:/xxxxx/infinispan-distributed-execution-framework/)
[info] Running EmbeddedCacheServer 
[error] 3 16, 2013 4:47:02 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport start
[error] INFO: ISPN000078: Starting JGroups Channel
[info] 
[info] -------------------------------------------------------------------
[info] GMS: address=ubuntu-63354, cluster=distributionFrameworkCluster, physical address=fe80:0:0:0:20c:29ff:fe5c:cfec%2:46321
[info] -------------------------------------------------------------------
[error] 3 16, 2013 4:47:06 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport viewAccepted
[error] INFO: ISPN000094: Received new cluster view: [ubuntu-63354|0] [ubuntu-63354]
[error] 3 16, 2013 4:47:06 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport startJGroupsChannelIfNeeded
[error] INFO: ISPN000079: Cache local address is ubuntu-63354, physical addresses are [fe80:0:0:0:20c:29ff:fe5c:cfec%2:46321]
[error] 3 16, 2013 4:47:06 午後 org.infinispan.factories.GlobalComponentRegistry start
[error] INFO: ISPN000128: Infinispan version: Infinispan 'Delirium' 5.2.1.Final
[error] 3 16, 2013 4:47:06 午後 org.infinispan.jmx.CacheJmxRegistration start
[error] INFO: ISPN000031: MBeans were successfully registered to the platform MBean server.

では、先ほどのサンプル(DistributedExecutionFrameworkExample)で

      val future = des.submit(new MyCallable)
      println(s"ExecutorService#submit Result => ${future.get}")

と書かれていた箇所を

      val futures = des.submitEverywhere(new MyCallable)
      futures.asScala.foreach(f => println(f.get))

と変更して実行します。

> run-main DistributedExecutionFrameworkExample
  〜省略〜
[error] 3 16, 2013 4:49:24 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport stop
[error] INFO: ISPN000080: Disconnecting and closing JGroups Channel
[error] 3 16, 2013 4:49:25 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport stop
[error] INFO: ISPN000082: Stopping the RpcDispatcher
[error] org.infinispan.CacheException: org.infinispan.marshall.NotSerializableException: MyCallable
[error] 	at org.infinispan.util.Util.cloneWithMarshaller(Util.java:259)
[error] 	at org.infinispan.distexec.DefaultExecutorService.clone(DefaultExecutorService.java:555)
[error] 	at org.infinispan.distexec.DefaultExecutorService.submitEverywhere(DefaultExecutorService.java:510)
[error] 	at org.infinispan.distexec.DefaultExecutorService.submitEverywhere(DefaultExecutorService.java:497)
  〜省略〜

Callableがシリアライズできないよ?と怒られました。

というわけで、先ほどのMyCallableクラスにSerializableを実装してリトライ。

@SerialVersionUID(1L)
class MyCallable extends Callable[Integer] with Serializable {

浮かしておいたインスタンス-Bも、再起動しておきます。では、実行。

> run-main DistributedExecutionFrameworkExample
[info] Compiling 1 Scala source to /xxxxx/infinispan-distributed-execution-framework/target/scala-2.10/classes...
[info] Compiling 2 Scala sources to /xxxxx/infinispan-distributed-execution-framework/target/scala-2.10/classes...
[info] Running DistributedExecutionFrameworkExample 
[error] 3 16, 2013 4:52:20 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport start
[error] INFO: ISPN000078: Starting JGroups Channel
[info] 
[info] -------------------------------------------------------------------
[info] GMS: address=ubuntu-13073, cluster=distributionFrameworkCluster, physical address=fe80:0:0:0:20c:29ff:fe5c:cfec%2:50966
[info] -------------------------------------------------------------------
  〜省略〜
[info] create mycallable instance
[info] 10
[info] called
[info] 10
  〜省略〜
[error] INFO: ISPN000082: Stopping the RpcDispatcher
[success] Total time: 6 s, completed 2013/03/16 16:52:22

今度は、結果が返ってきました。

インスタンス-B側でも、Callableが実行されたことがわかります。

[error] 3 16, 2013 4:52:21 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport viewAccepted
[error] INFO: ISPN000094: Received new cluster view: [ubuntu-16854|1] [ubuntu-16854, ubuntu-13073]
[info] called <---- コレ
[error] 3 16, 2013 4:52:22 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport viewAccepted
[error] INFO: ISPN000094: Received new cluster view: [ubuntu-16854|2] [ubuntu-16854]

ただ、このままだと単なるCallableを各インスタンスに転送しただけで、InfinispanのCacheを使用した処理は難しいです。できない、ってわけではないでしょうけど。

ここで利用するのが、DistributedCallableインターフェースのようです。

DefaultExecutorServiceとDistributedCallableを使用する

DefaultExecutorService#submit

DistributedCallableインターフェースは、Callableインターフェースを拡張したものです。

public interface DistributedCallable<K, V, T> extends Callable<T> {
   public void setEnvironment(Cache<K, V> cache, Set<K> inputKeys);
}

このインターフェースを利用すると、このCallableで使用するCacheとタスクの実行時にもらったキーのSetを受け取ることができるようになります。

では、DistributedCallableインターフェースを実装したクラスを作成してみます。
src/main/scala/MyDistributedCallable.scala

import scala.collection.JavaConverters._

import org.infinispan.Cache
import org.infinispan.distexec.DistributedCallable

class MyDistributedCallable extends DistributedCallable[String, Integer, Integer] {
  var inputKeys: Set[String] = _
  var cache: Cache[String, Integer] = _

  println("create distributedcallable instance")

  def setEnvironment(cache: Cache[String, Integer], inputKeys: java.util.Set[String]): Unit = {
    println(s"Input Keys => Size:${inputKeys.size}, Values:${inputKeys}")
    this.inputKeys = Set.empty ++ inputKeys.asScala
    this.cache = cache
  }

  def call(): Integer = {
    println("called")
    inputKeys.foldLeft(0) { (acc, key) =>
      cache.get(key) match {
        case null => acc
        case v => acc + v
      }
    }
  }
}

DistributedCallable#setEnvironmentが呼ばれたタイミングで、受け取ったKey一式を出力するようにしています。

で、先ほどのDistributedExecutionFrameworkExampleオブジェクトを変更します。

  def main(args: Array[String]): Unit = {
    val manager = new DefaultCacheManager("infinispan.xml")
    val cache = manager.getCache[String, Integer]()

    val limit = 50
    val keys = new Array[String](limit)
    var total = 0
    for (i <- 1 to limit) {
      val key =  "key" + i
      keys(i - 1) = key
      cache.put(key, i)
      total += i
    }

    val des = new DefaultExecutorService(cache)
    try {
      val future = des.submit(new MyDistributedCallable, keys: _*)
      println(s"ExecutorService#submit Result => ${future.get}, Local Sum = ${total}")
    } finally {
      des.shutdown()

      cache.stop()
      manager.stop()
    }
  }

1〜50までのKeyとValueをCacheに突っ込みつつ、その時のKey一式をDefaultExecutorService#submitの引数に、DistributedCallableのインスタンスと一緒に渡します。
結果はFutureとして戻ってくるので、その結果と最初に自前で合算した結果を出力してお終いです。

では、実行してみます。

> run-main DistributedExecutionFrameworkExample
  〜省略〜
[info] Input Keys => Size:50, Values:[key38, key39, key34, key35, key36, key37, key42, key41, key44, key43, key40, key4, key3, key6, key5, key2, key1, key49, key10, key11, key47, key8, key48, key7, key45, key46, key9, key50, key21, key22, key20, key15, key14, key13, key12, key19, key18, key17, key16, key30, key31, key32, key33, key24, key23, key26, key25, key28, key27, key29]
[info] called
[info] ExecutorService#submit Result => 1275, Local Sum = 1275
  〜省略〜
[success] Total time: 8 s, completed 2013/03/16 17:13:00

JGroupsその他のログは、端折りました。渡したKeyがDistributedCallableに渡され、CacheからKeyを使ってValueを合算できましたね。

DefaultExecutorService#submitEverywhere

最後、DefaultExecutorService#submitEverywhereを使用します。なので、ここで再び先ほどのインスタンス-Bに登場してもらいます。
シリアライズが必要だと思われるので、MyDistributedCallableクラスはシリアライズ可能にしておきます。

@SerialVersionUID(1L)
class MyDistributedCallable extends DistributedCallable[String, Integer, Integer] with Serializable {

また、今回は分散キャッシュで使用するので、Keyがどのインスタンスに設定されたのか分かるように、CacheにLoggerを付けておくことにします。

src/main/scala/LoggingListener.scala 
import org.infinispan.notifications.Listener
import org.infinispan.notifications.cachelistener.annotation.{CacheEntryCreated, CacheEntryRemoved}
import org.infinispan.notifications.cachelistener.event.{CacheEntryCreatedEvent, CacheEntryRemovedEvent}

@Listener
class LoggingListener {
  @CacheEntryCreated
  def observeAdd(event: CacheEntryCreatedEvent[_, _]): Unit =
    if (!event.isPre)
      println(s"Cache entry with key [${event.getKey}] added in cache [${event.getCache.getName}]")
    else ()

  @CacheEntryRemoved
  def observeRemoved(event: CacheEntryRemovedEvent[_, _]): Unit =
    println(s"Cache entry with key [${event.getKey}] removed in cache [${event.getCache.getName}]")
}

src/main/scala/DistributedExecutionFrameworkExample.scala

    val cache = manager.getCache[String, Integer]()
    cache.addListener(new LoggingListener)

そして、submitEverywhereを使うように修正。

      val futures = des.submitEverywhere(new MyDistributedCallable, keys: _*)
      val callableTotal = futures.asScala.foldLeft(0) { (acc, f) => acc + f.get }
      println(s"ExecutorService#submitEverywhere Result => ${callableTotal}, Local Sum = ${total}")

ここで、インスタンス-Bを起動しておきます。

$ sbt "run-main EmbeddedCacheServer"

インスタンス-Aから実行します。

> run-main DistributedExecutionFrameworkExample
  〜省略〜
[info] Cache entry with key [key5] added in cache [___defaultcache]
[info] Cache entry with key [key9] added in cache [___defaultcache]
[info] Cache entry with key [key10] added in cache [___defaultcache]
[info] Cache entry with key [key13] added in cache [___defaultcache]
[info] Cache entry with key [key14] added in cache [___defaultcache]
[info] Cache entry with key [key16] added in cache [___defaultcache]
[info] Cache entry with key [key17] added in cache [___defaultcache]
[info] Cache entry with key [key18] added in cache [___defaultcache]
[info] Cache entry with key [key22] added in cache [___defaultcache]
[info] Cache entry with key [key23] added in cache [___defaultcache]
[info] Cache entry with key [key28] added in cache [___defaultcache]
[info] Cache entry with key [key29] added in cache [___defaultcache]
[info] Cache entry with key [key30] added in cache [___defaultcache]
[info] Cache entry with key [key32] added in cache [___defaultcache]
[info] Cache entry with key [key34] added in cache [___defaultcache]
[info] Cache entry with key [key35] added in cache [___defaultcache]
[info] Cache entry with key [key36] added in cache [___defaultcache]
[info] Cache entry with key [key38] added in cache [___defaultcache]
[info] Cache entry with key [key40] added in cache [___defaultcache]
[info] Cache entry with key [key44] added in cache [___defaultcache]
[info] Cache entry with key [key46] added in cache [___defaultcache]
[info] Cache entry with key [key49] added in cache [___defaultcache]
[info] Cache entry with key [key50] added in cache [___defaultcache]
[info] create distributedcallable instance
[info] Input Keys => Size:23, Values:[key5, key38, key49, key10, key34, key30, key35, key22, key36, key32, key46, key9, key23, key14, key44, key13, key28, key50, key18, key40, key17, key29, key16]
[info] called
[info] ExecutorService#submitEverywhere Result => 1275, Local Sum = 1275
  〜省略〜
[success] Total time: 6 s, completed 2013/03/16 17:27:59

ログから、インスタンス-Aには、KeyとValueが23個登録されたようです。DistributedCallable#setEnvironmentに渡されたKeyの数も、やっぱり23個ですね。
計算結果は、それぞれのFutureの結果を合わせると同じですよ、と。

渡ってくるKeyは、java.util.HashSetになっているので、順番はバラバラです。

一方、インスタンス-B側のコンソールはこうなっています。

[error] 3 16, 2013 5:27:58 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport viewAccepted
[error] INFO: ISPN000094: Received new cluster view: [ubuntu-37697|1] [ubuntu-37697, ubuntu-60826]
[info] Cache entry with key [key1] added in cache [___defaultcache]
[info] Cache entry with key [key2] added in cache [___defaultcache]
[info] Cache entry with key [key3] added in cache [___defaultcache]
[info] Cache entry with key [key4] added in cache [___defaultcache]
[info] Cache entry with key [key6] added in cache [___defaultcache]
[info] Cache entry with key [key7] added in cache [___defaultcache]
[info] Cache entry with key [key8] added in cache [___defaultcache]
[info] Cache entry with key [key11] added in cache [___defaultcache]
[info] Cache entry with key [key12] added in cache [___defaultcache]
[info] Cache entry with key [key15] added in cache [___defaultcache]
[info] Cache entry with key [key19] added in cache [___defaultcache]
[info] Cache entry with key [key20] added in cache [___defaultcache]
[info] Cache entry with key [key21] added in cache [___defaultcache]
[info] Cache entry with key [key24] added in cache [___defaultcache]
[info] Cache entry with key [key25] added in cache [___defaultcache]
[info] Cache entry with key [key26] added in cache [___defaultcache]
[info] Cache entry with key [key27] added in cache [___defaultcache]
[info] Cache entry with key [key31] added in cache [___defaultcache]
[info] Cache entry with key [key33] added in cache [___defaultcache]
[info] Cache entry with key [key37] added in cache [___defaultcache]
[info] Cache entry with key [key39] added in cache [___defaultcache]
[info] Cache entry with key [key41] added in cache [___defaultcache]
[info] Cache entry with key [key42] added in cache [___defaultcache]
[info] Cache entry with key [key43] added in cache [___defaultcache]
[info] Cache entry with key [key45] added in cache [___defaultcache]
[info] Cache entry with key [key47] added in cache [___defaultcache]
[info] Cache entry with key [key48] added in cache [___defaultcache]
[info] Input Keys => Size:27, Values:[key39, key21, key20, key37, key15, key42, key41, key12, key43, key19, key4, key3, key6, key2, key1, key11, key47, key8, key48, key31, key7, key45, key33, key24, key26, key25, key27]
[info] called
[error] 3 16, 2013 5:27:59 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport viewAccepted
[error] INFO: ISPN000094: Received new cluster view: [ubuntu-37697|2] [ubuntu-37697]

こちらには、残りの27個のKeyが転送されていますね。

というわけで、DistributedCallableインターフェースを使用すると、それぞれのInfinispanのインスタンスで稼働するDistributedCallableのインスタンスが使用するCacheと、Keyの整合性が取れるように割り振ってくれるようです。

このKeyを割り振ってくれるのは、便利だなぁと思いますね。

Callableインターフェースだけを実装した場合でも、submit時にKeyを渡すことはできるのですが、Callable側で受け取る術がありません。また、Cacheの面倒もみてもらえないので、素直にDistributedCallableインターフェースを使用するのが吉でしょう。

ちなみに、このサンプルをsubmitEverywhereではなく

      val futures = des.submitEverywhere(new MyDistributedCallable, keys: _*)
      val callableTotal = futures.asScala.foldLeft(0) { (acc, f) => acc + f.get }
      println(s"ExecutorService#submitEverywhere Result => ${callableTotal}, Local Sum = ${total}")

submitを使うように変更すると、

      val future = des.submit(new MyDistributedCallable, keys: _*)
      println(s"ExecutorService#submit Result => ${future.get}, Local Sum = ${total}")

どれかひとつのインスタンスでDistributedCallableのインスタンスが実行されるようになります。

インスタンス-A

> run-main DistributedExecutionFrameworkExample
  〜省略〜
[info] Cache entry with key [key5] added in cache [___defaultcache]
[info] Cache entry with key [key9] added in cache [___defaultcache]
[info] Cache entry with key [key10] added in cache [___defaultcache]
[info] Cache entry with key [key13] added in cache [___defaultcache]
[info] Cache entry with key [key14] added in cache [___defaultcache]
[info] Cache entry with key [key16] added in cache [___defaultcache]
[info] Cache entry with key [key17] added in cache [___defaultcache]
[info] Cache entry with key [key18] added in cache [___defaultcache]
[info] Cache entry with key [key22] added in cache [___defaultcache]
[info] Cache entry with key [key23] added in cache [___defaultcache]
[info] Cache entry with key [key28] added in cache [___defaultcache]
[info] Cache entry with key [key29] added in cache [___defaultcache]
[info] Cache entry with key [key30] added in cache [___defaultcache]
[info] Cache entry with key [key32] added in cache [___defaultcache]
[info] Cache entry with key [key34] added in cache [___defaultcache]
[info] Cache entry with key [key35] added in cache [___defaultcache]
[info] Cache entry with key [key36] added in cache [___defaultcache]
[info] Cache entry with key [key38] added in cache [___defaultcache]
[info] Cache entry with key [key40] added in cache [___defaultcache]
[info] Cache entry with key [key44] added in cache [___defaultcache]
[info] Cache entry with key [key46] added in cache [___defaultcache]
[info] Cache entry with key [key49] added in cache [___defaultcache]
[info] Cache entry with key [key50] added in cache [___defaultcache]
[info] create distributedcallable instance
[info] Input Keys => Size:50, Values:[key38, key39, key34, key35, key36, key37, key42, key41, key44, key43, key40, key4, key3, key6, key5, key2, key1, key49, key10, key11, key47, key8, key48, key7, key45, key46, key9, key50, key21, key22, key20, key15, key14, key13, key12, key19, key18, key17, key16, key30, key31, key32, key33, key24, key23, key26, key25, key28, key27, key29]
[info] called
[info] ExecutorService#submit Result => 1275, Local Sum = 1275
  〜省略〜
[success] Total time: 4 s, completed 2013/03/16 17:42:35

インスタンス-B

[error] 3 16, 2013 5:42:33 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport viewAccepted
[error] INFO: ISPN000094: Received new cluster view: [ubuntu-13349|7] [ubuntu-13349, ubuntu-44271]
[error] 3 16, 2013 5:42:35 午後 org.infinispan.remoting.transport.jgroups.JGroupsTransport viewAccepted
[error] INFO: ISPN000094: Received new cluster view: [ubuntu-13349|8] [ubuntu-13349]

インスタンス-Bには、JGroupsでのメンバー情報が変わったくらいのログしかなく、インスタンス-AにすべてのKeyが渡っています。ただし、結果はsubmitEverywhereを使った時と一緒。

今回はインスタンス-Aの上でDistributedCallableのインスタンスが実行されましたが、インスタンス-Bの上で実行されることもありました。実行されたインスタンス上に、CacheのKeyに対応するエントリがない場合は、結局ネットワーク通信が発生しちゃうんでしょうね。

その他、タスクが失敗した時のフェイルオーバーの設定
https://docs.jboss.org/author/display/ISPN/Infinispan+Distributed+Execution+Framework#InfinispanDistributedExecutionFramework-Distributedtaskfailover
実行ポリシーの設定
https://docs.jboss.org/author/display/ISPN/Infinispan+Distributed+Execution+Framework#InfinispanDistributedExecutionFramework-Distributedtaskexecutionpolicy
上記と合わせてタイムアウトなどの設定
https://docs.jboss.org/author/display/ISPN/Infinispan+Distributed+Execution+Framework#InfinispanDistributedExecutionFramework-DistributedExecutorService%2CDistributedTaskBuilderandDistributedTaskAPI
などもできるようですが、今回は割愛。

CDIもドキュメントに載っていましたが、実行してみた感じだとJBoss ASとか要りそうだったのでやっぱりお見送り。

でも、なかなか面白いフレームワークでした。

最後に、全コードをもう1度載せておきます。

src/main/scala/DistributedExecutionFrameworkExample.scala 
import scala.collection.JavaConverters._

import java.util.Set

import org.infinispan.Cache
import org.infinispan.manager.DefaultCacheManager
import org.infinispan.distexec.DefaultExecutorService

object DistributedExecutionFrameworkExample {
  def main(args: Array[String]): Unit = {
    val manager = new DefaultCacheManager("infinispan.xml")
    val cache = manager.getCache[String, Integer]()
    cache.addListener(new LoggingListener)

    val limit = 50
    val keys = new Array[String](limit)
    var total = 0
    for (i <- 1 to limit) {
      val key =  "key" + i
      keys(i - 1) = key
      cache.put(key, i)
      total += i
    }

    val des = new DefaultExecutorService(cache)
    try {
      /*
      val future = des.submit(new MyCallable)
      println(s"ExecutorService#submit Result => ${future.get}")
      */
      /*
      val futures = des.submitEverywhere(new MyCallable)
      futures.asScala.foreach(f => println(f.get))
      */

      /*
      val future = des.submit(new MyDistributedCallable, keys: _*)
      println(s"ExecutorService#submit Result => ${future.get}, Local Sum = ${total}")
      */

      val futures = des.submitEverywhere(new MyDistributedCallable, keys: _*)
      val callableTotal = futures.asScala.foldLeft(0) { (acc, f) => acc + f.get }
      println(s"ExecutorService#submitEverywhere Result => ${callableTotal}, Local Sum = ${total}")
    } finally {
      des.shutdown()

      cache.stop()
      manager.stop()
    }
  }
}

src/main/scala/MyDistributedCallable.scala

import scala.collection.JavaConverters._

import org.infinispan.Cache
import org.infinispan.distexec.DistributedCallable

@SerialVersionUID(1L)
class MyDistributedCallable extends DistributedCallable[String, Integer, Integer] with Serializable {
  var inputKeys: Set[String] = _
  var cache: Cache[String, Integer] = _

  println("create distributedcallable instance")

  def setEnvironment(cache: Cache[String, Integer], inputKeys: java.util.Set[String]): Unit = {
    println(s"Input Keys => Size:${inputKeys.size}, Values:${inputKeys}")
    this.inputKeys = Set.empty ++ inputKeys.asScala
    this.cache = cache
  }

  def call(): Integer = {
    println("called")
    inputKeys.foldLeft(0) { (acc, key) =>
      cache.get(key) match {
        case null => acc
        case v => acc + v
      }
    }
  }
}

src/main/scala/MyCallable.scala

import java.util.concurrent.Callable

@SerialVersionUID(1L)
class MyCallable extends Callable[Integer] with Serializable {
  println("create mycallable instance")

  def call(): Integer = {
    println("called")
    10
  }
}

src/main/scala/LoggingListener.scala

import org.infinispan.notifications.Listener
import org.infinispan.notifications.cachelistener.annotation.{CacheEntryCreated, CacheEntryRemoved}
import org.infinispan.notifications.cachelistener.event.{CacheEntryCreatedEvent, CacheEntryRemovedEvent}

@Listener
class LoggingListener {
  @CacheEntryCreated
  def observeAdd(event: CacheEntryCreatedEvent[_, _]): Unit =
    if (!event.isPre)
      println(s"Cache entry with key [${event.getKey}] added in cache [${event.getCache.getName}]")
    else ()

  @CacheEntryRemoved
  def observeRemoved(event: CacheEntryRemovedEvent[_, _]): Unit =
    println(s"Cache entry with key [${event.getKey}] removed in cache [${event.getCache.getName}]")
}

src/main/scala/EmbeddedCacheServer.scala

import org.infinispan.manager.DefaultCacheManager

object EmbeddedCacheServer {
  def main(args: Array[String]): Unit = {
    val manager = new DefaultCacheManager("infinispan.xml")
    val cache = manager.getCache[String, String]()
    cache.addListener(new LoggingListener)
  }
}