これは、なにをしたくて書いたもの?
InfinispanのServer Taskのソースコードを見ていたら、前に踏んでいた問題がいろいろ直っているのに気づきまして。
せっかくなので、確認しておこうかなと。
前に踏んでいた問題
このエントリーを書いた時の話なのですが、以下のようなところに引っかかりました。
- Server Taskの実行にあたり、クライアントから接続するユーザーにADMIN権限が必要
- Server Taskの実行モードを全ノード(
ALL_NODES
)にして、かつMarshallingをProtoStreamにすると実行に失敗する - Server Taskの実行モードを全ノード(
ALL_NODES
)にすると、渡したパラメーターがすべてString
になる
Infinispan Server 12.1でProtoStreamでのMarshallingを使いつつ、Server Taskを実行する - CLOVER🍀
加えて、Server Taskのインスタンスは実はシングルトンであったりするという話がありました。
渡したパラメーターがすべてString
になるという点を除いて、これらの問題は修正されているようです。
以下のissueに対する修正で実行にADMIN権限が不要になり、
[ISPN-14143] Task execution needs ADMIN permission additional to EXEC - Red Hat Issue Tracker
こちらのissueに対する修正で実行モードが全ノード(ALL_NOES
)であっても実行に失敗しなくなったようです。
また、実行モードによって権限が異なるというissueの修正の関連で
どうもServer Taskのインスタンスの生成について指定ができるようになったみたいです。
インスタンス生成については、ServerTask
インターフェースの親インターフェースである、Task
を確認します。
Task (Infinispan JavaDoc 14.0.1.Final API)
Task#getInstantiationMode
で返す値で制御できるようになったみたいです。
戻り値に使用するのはTaskInstantiationMode
列挙型で、デフォルトはSHARED
でシングルトンとなりServer Taskのインスタンスは1度だけ
作成され、リクエストを跨いで使い回されます。ISOLATED
はServer Taskの呼び出しごとにインスタンスが作成されます。
TaskInstantiationMode (Infinispan JavaDoc 14.0.1.Final API)
Infinispan Serverのガイドにも追記がありました。
ServerTask#setTaskContext
の説明を見ると、以下のように書かれています。
In most cases, implementations store this information locally and use it when tasks are actually executed. When using SHARED instantiation mode, the task should use a ThreadLocal to store the TaskContext for concurrent invocations.
TaskInstantiationMode
がSHARED
の時は、ここで渡された値をThreadLocal
で保持する必要があると書かれています。
合わせて、サンプルコードもThreadLocal
を使うように修正されています。
package example; import org.infinispan.tasks.ServerTask; import org.infinispan.tasks.TaskContext; public class HelloTask implements ServerTask<String> { private static final ThreadLocal<TaskContext> taskContext = new ThreadLocal<>(); @Override public void setTaskContext(TaskContext ctx) { taskContext.set(ctx); } @Override public String call() throws Exception { TaskContext ctx = taskContext.get(); String name = (String) ctx.getParameters().get().get("name"); return "Hello " + name; } @Override public String getName() { return "hello-task"; } }
Server Taskのインスタンスは前々からシングルトンだったので、こうする必要があるように思っていましたがドキュメントが修正されたので
答え合わせができた気になりました。
せっかく修正されたので、今回は権限とServer Taskのインスタンス化について確認してみたいと思います。
なお、Server Taskの実行モードを全ノード(ALL_NODES
)にすると、渡したパラメーターがすべてString
になるという点に関しては、
変わらないよう(変わりようがない?)気がしますね。
環境
今回の環境は、こちら。
$ java --version openjdk 17.0.4 2022-07-19 OpenJDK Runtime Environment (build 17.0.4+8-Ubuntu-120.04) OpenJDK 64-Bit Server VM (build 17.0.4+8-Ubuntu-120.04, mixed mode, sharing) $ mvn --version Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 17.0.4, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-131-generic", arch: "amd64", family: "unix"
Infinispan Serverは、172.18.0.2〜172.18.0.4の3つのノードで動作し、クラスターを構成しているものとします。
$ java --version openjdk 17.0.4.1 2022-08-12 OpenJDK Runtime Environment Temurin-17.0.4.1+1 (build 17.0.4.1+1) OpenJDK 64-Bit Server VM Temurin-17.0.4.1+1 (build 17.0.4.1+1, mixed mode, sharing) $ bin/server.sh --version Infinispan Server 14.0.1.Final (Flying Saucer) Copyright (C) Red Hat Inc. and/or its affiliates and other contributors License Apache License, v. 2.0. http://www.apache.org/licenses/LICENSE-2.0
起動コマンドはこちら。
$ bin/server.sh \ -b 0.0.0.0 \ -Djgroups.tcp.address=`hostname -i`
お題
今回のお題は、以下のようにします。
- Mavenプロジェクトをひとつ作成
src/main/java
側でInfinispan ServerのServer Taskを作成- 処理内容は同じで、
TaskInstantiationMode
の指定をデフォルト(SHARED
)とISOLATED
の2つのServer Taskを作成 - インスタンス化の回数を確認するために、呼び出しごとにカウントアップする処理を入れておく
- 処理対象ノードは、すべてのノードとしてMarshallingの確認を行う
- 処理内容は同じで、
- 動作確認はテストコードで行う
準備
Maven依存関係など。
<properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> </properties> <dependencies> <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-tasks-api</artifactId> <version>14.0.1.Final</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-client-hotrod</artifactId> <version>14.0.1.Final</version> <scope>test</scope> </dependency> <!-- RemoteCacheManagerAdminで使用、infinispan-tasks-apiに含まれている --> <!-- <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-core</artifactId> <version>14.0.1.Final</version> <scope>test</scope> </dependency> --> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter</artifactId> <version>5.9.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.23.1</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.2</version> </plugin> </plugins> </build>
Infinispan Serverの各Nodeには、管理ユーザーおよびアプリケーションユーザーを作成しておきます。
$ bin/cli.sh user create -g admin -p password ispn-admin $ bin/cli.sh user create -g application -p password ispn-user
Server Taskを作成する
では、まずはServer Taskを作成していきます。
最初はTaskInstantiationMode#SHARED
のServer Task。このServer Taskは、処理を実行する際に1度だけインスタンス化され、
あとはリクエストをまたがってインスタンスが再利用されます。
src/main/java/org/littlewings/infinispan/remote/task/SharedInstanceSummarizeServerTask.java
package org.littlewings.infinispan.remote.task; import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.infinispan.Cache; import org.infinispan.stream.CacheCollectors; import org.infinispan.tasks.ServerTask; import org.infinispan.tasks.TaskContext; import org.infinispan.tasks.TaskExecutionMode; public class SharedInstanceSummarizeServerTask implements ServerTask<String> { AtomicInteger callCounter = new AtomicInteger(); ThreadLocal<TaskContext> threadLocalTaskContext = new ThreadLocal<>(); @Override public void setTaskContext(TaskContext taskContext) { this.threadLocalTaskContext.set(taskContext); } @Override public String call() throws Exception { TaskContext taskContext = threadLocalTaskContext.get(); try { Map<String, Object> parameters = taskContext.getParameters().orElse(Collections.emptyMap()); int called = callCounter.incrementAndGet(); @SuppressWarnings("unchecked") Cache<String, Integer> cache = (Cache<String, Integer>) taskContext.getCache().get(); int sum = cache.values().stream().collect(CacheCollectors.serializableCollector(() -> Collectors.summingInt(i -> i))); return String.format("%s, shared instance server task call count = %d, sum result = %d", parameters.get("message"), called, sum); } finally { threadLocalTaskContext.remove(); } } @Override public String getName() { return SharedInstanceSummarizeServerTask.class.getSimpleName(); } @Override public TaskExecutionMode getExecutionMode() { return TaskExecutionMode.ALL_NODES; } /* デフォルトで以下と同じ @Override public TaskInstantiationMode getInstantiationMode() { return TaskInstantiationMode.SHARED; } */ }
TaskInstantiationMode
はServerTask#getInstantiationMode
メソッドをオーバーライドして指定する必要がありますが、デフォルトが
TaskInstantiationMode#SHARED
なので今回はそのままいこうと思います。
/* デフォルトで以下と同じ @Override public TaskInstantiationMode getInstantiationMode() { return TaskInstantiationMode.SHARED; } */
TaskContext
は同時呼び出しされることに備え、ドキュメントに習ってThreadLocal
で保持します。
ThreadLocal<TaskContext> threadLocalTaskContext = new ThreadLocal<>(); @Override public void setTaskContext(TaskContext taskContext) { this.threadLocalTaskContext.set(taskContext); }
今回ドキュメントに明記されましたが、今までもこうするべきだったんでしょうね。
TaskContext
はcall
メソッドの最初に取り出して、
@Override public String call() throws Exception { TaskContext taskContext = threadLocalTaskContext.get();
最後に削除。
} finally {
threadLocalTaskContext.remove();
}
あとは、インスタンスが1度しか作成されていないことを確認するために、カウンターを用意しておきます。
AtomicInteger callCounter = new AtomicInteger();
これで、呼び出しごとにメッセージ内の値がカウントアップされていくはずです。
return String.format("%s, shared instance server task call count = %d, sum result = %d", parameters.get("message"), called, sum);
実行ノードは、全ノードが対象です。
@Override public TaskExecutionMode getExecutionMode() { return TaskExecutionMode.ALL_NODES; }
なお、文字列の送受信だけではなんなので、Cache
を使ったコードも入れておきました。
@SuppressWarnings("unchecked") Cache<String, Integer> cache = (Cache<String, Integer>) taskContext.getCache().get(); int sum = cache.values().stream().collect(CacheCollectors.serializableCollector(() -> Collectors.summingInt(i -> i)));
続いては、呼び出しの都度インスタンス化を行うServer Task。TaskInstantiationMode#ISOLATED
を指定するバージョンですね。
src/main/java/org/littlewings/infinispan/remote/task/IsolatedInstanceSummarizeServerTask.java
package org.littlewings.infinispan.remote.task; import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.infinispan.Cache; import org.infinispan.stream.CacheCollectors; import org.infinispan.tasks.ServerTask; import org.infinispan.tasks.TaskContext; import org.infinispan.tasks.TaskExecutionMode; import org.infinispan.tasks.TaskInstantiationMode; public class IsolatedInstanceSummarizeServerTask implements ServerTask<String> { AtomicInteger callCounter = new AtomicInteger(); TaskContext taskContext; @Override public void setTaskContext(TaskContext taskContext) { this.taskContext = taskContext; } @Override public String call() throws Exception { Map<String, Object> parameters = taskContext.getParameters().orElse(Collections.emptyMap()); int called = callCounter.incrementAndGet(); @SuppressWarnings("unchecked") Cache<String, Integer> cache = (Cache<String, Integer>) taskContext.getCache().get(); int sum = cache.values().stream().collect(CacheCollectors.serializableCollector(() -> Collectors.summingInt(i -> i))); return String.format("%s, isolated instance server task call count = %d, sum result = %d", parameters.get("message"), called, sum); } @Override public String getName() { return IsolatedInstanceSummarizeServerTask.class.getSimpleName(); } @Override public TaskExecutionMode getExecutionMode() { return TaskExecutionMode.ALL_NODES; } @Override public TaskInstantiationMode getInstantiationMode() { return TaskInstantiationMode.ISOLATED; } }
先ほどとの違いは、まずはServerTask#getInstantiationMode
メソッドでTaskInstantiationMode#ISOLATED
を返すようにしていることですね。
@Override public TaskInstantiationMode getInstantiationMode() { return TaskInstantiationMode.ISOLATED; }
これで、呼び出し都度このServer Taskがインスタンス化されることになります。
このため、TaskContext
をThreadLocal
で保持する必要はなくなります。
TaskContext taskContext; @Override public void setTaskContext(TaskContext taskContext) { this.taskContext = taskContext; }
最後に、Server TaskはService Loaderの仕組みでロードされるため、META-INF/services/org.infinispan.tasks.ServerTask
というファイルに
今回作成したクラスのFQCNを書いておきます。
src/main/resources/META-INF/services/org.infinispan.tasks.ServerTask
org.littlewings.infinispan.remote.task.SharedInstanceSummarizeServerTask org.littlewings.infinispan.remote.task.IsolatedInstanceSummarizeServerTask
あとはパッケージングして
$ mvn package -DskipTests=true
Infinispan Serverにデプロイ。Server TaskのJARファイルを、server/lib
にコピーします。
$ cp /path/to/target/remote-task-instantiation-0.0.1-SNAPSHOT.jar server/lib
JARファイルを配置したら、Infinispan Serverを再起動します。
起動中のログで、作成したServer Taskがロードされているのを確認しましょう。
2022-10-25 15:05:34,561 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'org.littlewings.infinispan.remote.task.SharedInstanceSummarizeServerTask' 2022-10-25 15:05:34,561 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'org.littlewings.infinispan.remote.task.IsolatedInstanceSummarizeServerTask'
同じことを、全ノードに対して行ったらデプロイ完了です。
確認する
確認は、テストコードで行います。
作成したテストコードはこちら。
src/test/java/org/littlewings/infinispan/remote/task/RemoteTaskInstantiationTest.java
package org.littlewings.infinispan.remote.task; import java.util.List; import java.util.Map; import java.util.function.Consumer; import java.util.stream.IntStream; import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.client.hotrod.RemoteCacheManager; import org.infinispan.client.hotrod.RemoteCacheManagerAdmin; import org.infinispan.client.hotrod.configuration.Configuration; import org.infinispan.client.hotrod.configuration.ConfigurationBuilder; import org.infinispan.configuration.cache.CacheMode; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; public class RemoteTaskInstantiationTest { @BeforeAll static void createCache() { Configuration configuration = new ConfigurationBuilder() .uri("hotrod://ispn-admin:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222") .build(); try (RemoteCacheManager manager = new RemoteCacheManager(configuration)) { RemoteCacheManagerAdmin admin = manager.administration(); admin.removeCache("distCache"); org.infinispan.configuration.cache.Configuration cacheConfiguration = new org.infinispan.configuration.cache.ConfigurationBuilder() .clustering().cacheMode(CacheMode.DIST_SYNC) .security().authorization().enable() .encoding().key().mediaType("application/x-protostream") .encoding().value().mediaType("application/x-protostream") .build(); admin.getOrCreateCache("distCache", cacheConfiguration); } } <K, V> void withRemoteCache(String cacheName, Consumer<RemoteCache<K, V>> consumer) { Configuration configuration = new ConfigurationBuilder() .uri("hotrod://ispn-user:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222") .build(); try (RemoteCacheManager manager = new RemoteCacheManager(configuration)) { RemoteCache<K, V> cache = manager.getCache(cacheName); consumer.accept(cache); } } @Test public void instanceSharedTasks() { this.<String, Integer>withRemoteCache("distCache", cache -> { IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, i)); assertThat(cache).hasSize(100); List<String> results1 = cache.execute("SharedInstanceSummarizeServerTask", Map.of("message", "hello")); assertThat(results1) .hasSize(3) .containsOnly( "hello, shared instance server task call count = 1, sum result = 5050", "hello, shared instance server task call count = 1, sum result = 5050", "hello, shared instance server task call count = 1, sum result = 5050" ); List<String> results2 = cache.execute("SharedInstanceSummarizeServerTask", Map.of("message", "world")); assertThat(results2) .hasSize(3) .containsOnly( "world, shared instance server task call count = 2, sum result = 5050", "world, shared instance server task call count = 2, sum result = 5050", "world, shared instance server task call count = 2, sum result = 5050" ); List<String> results3 = cache.execute("SharedInstanceSummarizeServerTask", Map.of("message", "yeah")); assertThat(results3) .hasSize(3) .containsOnly( "yeah, shared instance server task call count = 3, sum result = 5050", "yeah, shared instance server task call count = 3, sum result = 5050", "yeah, shared instance server task call count = 3, sum result = 5050" ); cache.clear(); }); } @Test public void instanceIsolatedTasks() { this.<String, Integer>withRemoteCache("distCache", cache -> { IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, i)); assertThat(cache).hasSize(100); List<String> results1 = cache.execute("IsolatedInstanceSummarizeServerTask", Map.of("message", "hello")); assertThat(results1) .hasSize(3) .containsOnly( "hello, isolated instance server task call count = 1, sum result = 5050", "hello, isolated instance server task call count = 1, sum result = 5050", "hello, isolated instance server task call count = 1, sum result = 5050" ); List<String> results2 = cache.execute("IsolatedInstanceSummarizeServerTask", Map.of("message", "world")); assertThat(results2) .hasSize(3) .containsOnly( "world, isolated instance server task call count = 1, sum result = 5050", "world, isolated instance server task call count = 1, sum result = 5050", "world, isolated instance server task call count = 1, sum result = 5050" ); List<String> results3 = cache.execute("IsolatedInstanceSummarizeServerTask", Map.of("message", "yeah")); assertThat(results3) .hasSize(3) .containsOnly( "yeah, isolated instance server task call count = 1, sum result = 5050", "yeah, isolated instance server task call count = 1, sum result = 5050", "yeah, isolated instance server task call count = 1, sum result = 5050" ); cache.clear(); }); } }
テストコードの内容を説明していきます。
こちらは、テストの最初にキャッシュの作成を行うようにしています。こちらの接続で使用しているユーザーは、管理ユーザーです。
@BeforeAll static void createCache() { Configuration configuration = new ConfigurationBuilder() .uri("hotrod://ispn-admin:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222") .build(); try (RemoteCacheManager manager = new RemoteCacheManager(configuration)) { RemoteCacheManagerAdmin admin = manager.administration(); admin.removeCache("distCache"); org.infinispan.configuration.cache.Configuration cacheConfiguration = new org.infinispan.configuration.cache.ConfigurationBuilder() .clustering().cacheMode(CacheMode.DIST_SYNC) .security().authorization().enable() .encoding().key().mediaType("application/x-protostream") .encoding().value().mediaType("application/x-protostream") .build(); admin.getOrCreateCache("distCache", cacheConfiguration); } }
キャッシュの種類は、なんとなくDistributed Cacheです。
こちらは、テスト内で使用するRemoteCache
を作成するメソッド。接続にはアプリケーション用のユーザーを使用します。
<K, V> void withRemoteCache(String cacheName, Consumer<RemoteCache<K, V>> consumer) { Configuration configuration = new ConfigurationBuilder() .uri("hotrod://ispn-user:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222") .build(); try (RemoteCacheManager manager = new RemoteCacheManager(configuration)) { RemoteCache<K, V> cache = manager.getCache(cacheName); consumer.accept(cache); } }
あとはテストです。
最初は、TaskInstantiationMode
をTaskInstantiationMode#SHARED
にしたServer Taskに対する呼び出し。
@Test public void instanceSharedTasks() { this.<String, Integer>withRemoteCache("distCache", cache -> { IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, i)); assertThat(cache).hasSize(100); List<String> results1 = cache.execute("SharedInstanceSummarizeServerTask", Map.of("message", "hello")); assertThat(results1) .hasSize(3) .containsOnly( "hello, shared instance server task call count = 1, sum result = 5050", "hello, shared instance server task call count = 1, sum result = 5050", "hello, shared instance server task call count = 1, sum result = 5050" ); List<String> results2 = cache.execute("SharedInstanceSummarizeServerTask", Map.of("message", "world")); assertThat(results2) .hasSize(3) .containsOnly( "world, shared instance server task call count = 2, sum result = 5050", "world, shared instance server task call count = 2, sum result = 5050", "world, shared instance server task call count = 2, sum result = 5050" ); List<String> results3 = cache.execute("SharedInstanceSummarizeServerTask", Map.of("message", "yeah")); assertThat(results3) .hasSize(3) .containsOnly( "yeah, shared instance server task call count = 3, sum result = 5050", "yeah, shared instance server task call count = 3, sum result = 5050", "yeah, shared instance server task call count = 3, sum result = 5050" ); cache.clear(); }); }
呼び出しごとに、カウントアップされていますね。これは、Server Taskのインスタンスが使い回されているからです。
続いて、TaskInstantiationMode
をTaskInstantiationMode#ISOLATED
にしたServer Taskに対する呼び出し。
@Test public void instanceIsolatedTasks() { this.<String, Integer>withRemoteCache("distCache", cache -> { IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, i)); assertThat(cache).hasSize(100); List<String> results1 = cache.execute("IsolatedInstanceSummarizeServerTask", Map.of("message", "hello")); assertThat(results1) .hasSize(3) .containsOnly( "hello, isolated instance server task call count = 1, sum result = 5050", "hello, isolated instance server task call count = 1, sum result = 5050", "hello, isolated instance server task call count = 1, sum result = 5050" ); List<String> results2 = cache.execute("IsolatedInstanceSummarizeServerTask", Map.of("message", "world")); assertThat(results2) .hasSize(3) .containsOnly( "world, isolated instance server task call count = 1, sum result = 5050", "world, isolated instance server task call count = 1, sum result = 5050", "world, isolated instance server task call count = 1, sum result = 5050" ); List<String> results3 = cache.execute("IsolatedInstanceSummarizeServerTask", Map.of("message", "yeah")); assertThat(results3) .hasSize(3) .containsOnly( "yeah, isolated instance server task call count = 1, sum result = 5050", "yeah, isolated instance server task call count = 1, sum result = 5050", "yeah, isolated instance server task call count = 1, sum result = 5050" ); cache.clear(); }); }
こちらは、カウントアップされません。呼び出しごとにServer Taskのインスタンスが作られているからですね。
というわけで、確認しておきたかったことはこれで済みました。
ソースコードを見てみる
今回の内容について、Infinispan側のソースコードを少し見ておきましょう。
まず、Server Taskのインスタンス化の制御についてはこちらです。
public T run(TaskContext context) throws Exception { final ServerTask<T> t; if (task.getInstantiationMode() == TaskInstantiationMode.ISOLATED) { t = Util.getInstance(task.getClass()); } else { t = task; } t.setTaskContext(context); if (log.isTraceEnabled()) { log.tracef("Executing task '%s' in '%s' mode using context %s", getName(), getInstantiationMode(), context); } return t.call(); }
Server Taskの実行にあたり、Infinispan Serverに接続するユーザーにADMIN権限が必要とされていた点については、Pull Requestで
ソースコードが大量に変わっていたのであんまり深追いしませんでした…。
また、MarshallerをProtoStreamにして実行ノードを全ノードにした場合にMarshallingに失敗する問題は、以下の部分で
Collections#synchronizedList
をArrayList
にしたことで解消されています。
確認は、こんなところで。
まとめ
今回は、Infinispan ServerのServer Taskで、以前にいくつか困ったことがある問題が修正されているのを確認してみました。
だいぶ使いやすくなったのではないかな、と思います。
今回作成したソースコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-task-instantiation