CLOVER🍀

That was when it all began.

Infinispan Server 14.0.1.Finalで、Server Taskが呼び出しごとにインスタンス化できたり、ADMIN権限が不要になったりしていた話

これは、なにをしたくて書いたもの?

InfinispanのServer Taskのソースコードを見ていたら、前に踏んでいた問題がいろいろ直っているのに気づきまして。

せっかくなので、確認しておこうかなと。

前に踏んでいた問題

このエントリーを書いた時の話なのですが、以下のようなところに引っかかりました。

  • Server Taskの実行にあたり、クライアントから接続するユーザーにADMIN権限が必要
  • Server Taskの実行モードを全ノード(ALL_NODES)にして、かつMarshallingをProtoStreamにすると実行に失敗する
  • Server Taskの実行モードを全ノード(ALL_NODES)にすると、渡したパラメーターがすべてStringになる

Infinispan Server 12.1でProtoStreamでのMarshallingを使いつつ、Server Taskを実行する - CLOVER🍀

加えて、Server Taskのインスタンスは実はシングルトンであったりするという話がありました。

渡したパラメーターがすべてStringになるという点を除いて、これらの問題は修正されているようです。

以下のissueに対する修正で実行にADMIN権限が不要になり、

[ISPN-14143] Task execution needs ADMIN permission additional to EXEC - Red Hat Issue Tracker

こちらのissueに対する修正で実行モードが全ノード(ALL_NOES)であっても実行に失敗しなくなったようです。

[ISPN-14131] ProtoStream will throw an Exception if a remote-task is executed in TaskExecutionMode.ALL_NODES - Red Hat Issue Tracker

また、実行モードによって権限が異なるというissueの修正の関連で

[ISPN-14144] Task execution (permission) is different for ALL_NODES and ONE_NODE - Red Hat Issue Tracker

どうもServer Taskのインスタンスの生成について指定ができるようになったみたいです。

ISPN-14144 Propagate subject in task execution by tristantarrant · Pull Request #10336 · infinispan/infinispan · GitHub

インスタンス生成については、ServerTaskインターフェースの親インターフェースである、Taskを確認します。

Task (Infinispan JavaDoc 14.0.1.Final API)

Task#getInstantiationModeで返す値で制御できるようになったみたいです。

Task#getInstantiationMode)

戻り値に使用するのはTaskInstantiationMode列挙型で、デフォルトはSHAREDでシングルトンとなりServer Taskのインスタンスは1度だけ
作成され、リクエストを跨いで使い回されます。ISOLATEDはServer Taskの呼び出しごとにインスタンスが作成されます。

TaskInstantiationMode (Infinispan JavaDoc 14.0.1.Final API)

Infinispan Serverのガイドにも追記がありました。

ServerTask#setTaskContextの説明を見ると、以下のように書かれています。

In most cases, implementations store this information locally and use it when tasks are actually executed. When using SHARED instantiation mode, the task should use a ThreadLocal to store the TaskContext for concurrent invocations.

Guide to Infinispan Server / Running scripts and tasks on Infinispan Server / Adding tasks to Infinispan Server deployments / Infinispan Server tasks

TaskInstantiationModeSHAREDの時は、ここで渡された値をThreadLocalで保持する必要があると書かれています。

合わせて、サンプルコードもThreadLocalを使うように修正されています。

package example;

import org.infinispan.tasks.ServerTask;
import org.infinispan.tasks.TaskContext;

public class HelloTask implements ServerTask<String> {

   private static final ThreadLocal<TaskContext> taskContext = new ThreadLocal<>();

   @Override
   public void setTaskContext(TaskContext ctx) {
      taskContext.set(ctx);
   }

   @Override
   public String call() throws Exception {
      TaskContext ctx = taskContext.get();
      String name = (String) ctx.getParameters().get().get("name");
      return "Hello " + name;
   }

   @Override
   public String getName() {
      return "hello-task";
   }

}

Server Taskのインスタンスは前々からシングルトンだったので、こうする必要があるように思っていましたがドキュメントが修正されたので
答え合わせができた気になりました。

せっかく修正されたので、今回は権限とServer Taskのインスタンス化について確認してみたいと思います。

なお、Server Taskの実行モードを全ノード(ALL_NODES)にすると、渡したパラメーターがすべてStringになるという点に関しては、
変わらないよう(変わりようがない?)気がしますね。

https://github.com/infinispan/infinispan/blob/14.0.1.Final/tasks/api/src/main/java/org/infinispan/tasks/TaskContext.java#L198-L211

環境

今回の環境は、こちら。

$ java --version
openjdk 17.0.4 2022-07-19
OpenJDK Runtime Environment (build 17.0.4+8-Ubuntu-120.04)
OpenJDK 64-Bit Server VM (build 17.0.4+8-Ubuntu-120.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 17.0.4, vendor: Private Build, runtime: /usr/lib/jvm/java-17-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-131-generic", arch: "amd64", family: "unix"

Infinispan Serverは、172.18.0.2〜172.18.0.4の3つのノードで動作し、クラスターを構成しているものとします。

$ java --version
openjdk 17.0.4.1 2022-08-12
OpenJDK Runtime Environment Temurin-17.0.4.1+1 (build 17.0.4.1+1)
OpenJDK 64-Bit Server VM Temurin-17.0.4.1+1 (build 17.0.4.1+1, mixed mode, sharing)


$ bin/server.sh --version

Infinispan Server 14.0.1.Final (Flying Saucer)
Copyright (C) Red Hat Inc. and/or its affiliates and other contributors
License Apache License, v. 2.0. http://www.apache.org/licenses/LICENSE-2.0

起動コマンドはこちら。

$ bin/server.sh \
    -b 0.0.0.0 \
    -Djgroups.tcp.address=`hostname -i`

お題

今回のお題は、以下のようにします。

  • Mavenプロジェクトをひとつ作成
  • src/main/java側でInfinispan ServerのServer Taskを作成
    • 処理内容は同じで、TaskInstantiationModeの指定をデフォルト(SHARED)とISOLATEDの2つのServer Taskを作成
    • インスタンス化の回数を確認するために、呼び出しごとにカウントアップする処理を入れておく
    • 処理対象ノードは、すべてのノードとしてMarshallingの確認を行う
  • 動作確認はテストコードで行う

準備

Maven依存関係など。

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-tasks-api</artifactId>
            <version>14.0.1.Final</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-client-hotrod</artifactId>
            <version>14.0.1.Final</version>
            <scope>test</scope>
        </dependency>
        <!-- RemoteCacheManagerAdminで使用、infinispan-tasks-apiに含まれている -->
        <!--
        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-core</artifactId>
            <version>14.0.1.Final</version>
            <scope>test</scope>
        </dependency>
        -->

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>5.9.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.23.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
        </plugins>
    </build>

Infinispan Serverの各Nodeには、管理ユーザーおよびアプリケーションユーザーを作成しておきます。

$  bin/cli.sh user create -g admin -p password ispn-admin
$  bin/cli.sh user create -g application -p password ispn-user

Server Taskを作成する

では、まずはServer Taskを作成していきます。

最初はTaskInstantiationMode#SHAREDのServer Task。このServer Taskは、処理を実行する際に1度だけインスタンス化され、
あとはリクエストをまたがってインスタンスが再利用されます。

src/main/java/org/littlewings/infinispan/remote/task/SharedInstanceSummarizeServerTask.java

package org.littlewings.infinispan.remote.task;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.infinispan.Cache;
import org.infinispan.stream.CacheCollectors;
import org.infinispan.tasks.ServerTask;
import org.infinispan.tasks.TaskContext;
import org.infinispan.tasks.TaskExecutionMode;

public class SharedInstanceSummarizeServerTask implements ServerTask<String> {
    AtomicInteger callCounter = new AtomicInteger();

    ThreadLocal<TaskContext> threadLocalTaskContext = new ThreadLocal<>();

    @Override
    public void setTaskContext(TaskContext taskContext) {
        this.threadLocalTaskContext.set(taskContext);
    }

    @Override
    public String call() throws Exception {
        TaskContext taskContext = threadLocalTaskContext.get();

        try {
            Map<String, Object> parameters = taskContext.getParameters().orElse(Collections.emptyMap());

            int called = callCounter.incrementAndGet();

            @SuppressWarnings("unchecked")
            Cache<String, Integer> cache = (Cache<String, Integer>) taskContext.getCache().get();
            int sum = cache.values().stream().collect(CacheCollectors.serializableCollector(() -> Collectors.summingInt(i -> i)));

            return String.format("%s, shared instance server task call count = %d, sum result = %d", parameters.get("message"), called, sum);
        } finally {
            threadLocalTaskContext.remove();
        }
    }

    @Override
    public String getName() {
        return SharedInstanceSummarizeServerTask.class.getSimpleName();
    }

    @Override
    public TaskExecutionMode getExecutionMode() {
        return TaskExecutionMode.ALL_NODES;
    }

    /* デフォルトで以下と同じ
    @Override
    public TaskInstantiationMode getInstantiationMode() {
        return TaskInstantiationMode.SHARED;
    }
    */
}

TaskInstantiationModeServerTask#getInstantiationModeメソッドをオーバーライドして指定する必要がありますが、デフォルトが
TaskInstantiationMode#SHAREDなので今回はそのままいこうと思います。

    /* デフォルトで以下と同じ
    @Override
    public TaskInstantiationMode getInstantiationMode() {
        return TaskInstantiationMode.SHARED;
    }
    */

TaskContextは同時呼び出しされることに備え、ドキュメントに習ってThreadLocalで保持します。

    ThreadLocal<TaskContext> threadLocalTaskContext = new ThreadLocal<>();

    @Override
    public void setTaskContext(TaskContext taskContext) {
        this.threadLocalTaskContext.set(taskContext);
    }

今回ドキュメントに明記されましたが、今までもこうするべきだったんでしょうね。

TaskContextcallメソッドの最初に取り出して、

    @Override
    public String call() throws Exception {
        TaskContext taskContext = threadLocalTaskContext.get();

最後に削除。

        } finally {
            threadLocalTaskContext.remove();
        }

あとは、インスタンスが1度しか作成されていないことを確認するために、カウンターを用意しておきます。

    AtomicInteger callCounter = new AtomicInteger();

これで、呼び出しごとにメッセージ内の値がカウントアップされていくはずです。

            return String.format("%s, shared instance server task call count = %d, sum result = %d", parameters.get("message"), called, sum);

実行ノードは、全ノードが対象です。

    @Override
    public TaskExecutionMode getExecutionMode() {
        return TaskExecutionMode.ALL_NODES;
    }

なお、文字列の送受信だけではなんなので、Cacheを使ったコードも入れておきました。

            @SuppressWarnings("unchecked")
            Cache<String, Integer> cache = (Cache<String, Integer>) taskContext.getCache().get();
            int sum = cache.values().stream().collect(CacheCollectors.serializableCollector(() -> Collectors.summingInt(i -> i)));

続いては、呼び出しの都度インスタンス化を行うServer Task。TaskInstantiationMode#ISOLATEDを指定するバージョンですね。

src/main/java/org/littlewings/infinispan/remote/task/IsolatedInstanceSummarizeServerTask.java

package org.littlewings.infinispan.remote.task;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.infinispan.Cache;
import org.infinispan.stream.CacheCollectors;
import org.infinispan.tasks.ServerTask;
import org.infinispan.tasks.TaskContext;
import org.infinispan.tasks.TaskExecutionMode;
import org.infinispan.tasks.TaskInstantiationMode;

public class IsolatedInstanceSummarizeServerTask implements ServerTask<String> {
    AtomicInteger callCounter = new AtomicInteger();

    TaskContext taskContext;

    @Override
    public void setTaskContext(TaskContext taskContext) {
        this.taskContext = taskContext;
    }

    @Override
    public String call() throws Exception {
        Map<String, Object> parameters = taskContext.getParameters().orElse(Collections.emptyMap());

        int called = callCounter.incrementAndGet();

        @SuppressWarnings("unchecked")
        Cache<String, Integer> cache = (Cache<String, Integer>) taskContext.getCache().get();
        int sum = cache.values().stream().collect(CacheCollectors.serializableCollector(() -> Collectors.summingInt(i -> i)));

        return String.format("%s, isolated instance server task call count = %d, sum result = %d", parameters.get("message"), called, sum);
    }

    @Override
    public String getName() {
        return IsolatedInstanceSummarizeServerTask.class.getSimpleName();
    }

    @Override
    public TaskExecutionMode getExecutionMode() {
        return TaskExecutionMode.ALL_NODES;
    }

    @Override
    public TaskInstantiationMode getInstantiationMode() {
        return TaskInstantiationMode.ISOLATED;
    }
}

先ほどとの違いは、まずはServerTask#getInstantiationModeメソッドでTaskInstantiationMode#ISOLATEDを返すようにしていることですね。

    @Override
    public TaskInstantiationMode getInstantiationMode() {
        return TaskInstantiationMode.ISOLATED;
    }

これで、呼び出し都度このServer Taskがインスタンス化されることになります。

このため、TaskContextThreadLocalで保持する必要はなくなります。

    TaskContext taskContext;

    @Override
    public void setTaskContext(TaskContext taskContext) {
        this.taskContext = taskContext;
    }

最後に、Server TaskはService Loaderの仕組みでロードされるため、META-INF/services/org.infinispan.tasks.ServerTaskというファイルに
今回作成したクラスのFQCNを書いておきます。

src/main/resources/META-INF/services/org.infinispan.tasks.ServerTask

org.littlewings.infinispan.remote.task.SharedInstanceSummarizeServerTask
org.littlewings.infinispan.remote.task.IsolatedInstanceSummarizeServerTask

あとはパッケージングして

$ mvn package -DskipTests=true

Infinispan Serverにデプロイ。Server TaskのJARファイルを、server/libにコピーします。

$ cp /path/to/target/remote-task-instantiation-0.0.1-SNAPSHOT.jar server/lib

JARファイルを配置したら、Infinispan Serverを再起動します。

起動中のログで、作成したServer Taskがロードされているのを確認しましょう。

2022-10-25 15:05:34,561 INFO  (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'org.littlewings.infinispan.remote.task.SharedInstanceSummarizeServerTask'
2022-10-25 15:05:34,561 INFO  (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'org.littlewings.infinispan.remote.task.IsolatedInstanceSummarizeServerTask'

同じことを、全ノードに対して行ったらデプロイ完了です。

確認する

確認は、テストコードで行います。

作成したテストコードはこちら。

src/test/java/org/littlewings/infinispan/remote/task/RemoteTaskInstantiationTest.java

package org.littlewings.infinispan.remote.task;

import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.IntStream;

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.RemoteCacheManagerAdmin;
import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.configuration.cache.CacheMode;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class RemoteTaskInstantiationTest {
    @BeforeAll
    static void createCache() {
        Configuration configuration =
                new ConfigurationBuilder()
                        .uri("hotrod://ispn-admin:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222")
                        .build();

        try (RemoteCacheManager manager = new RemoteCacheManager(configuration)) {
            RemoteCacheManagerAdmin admin = manager.administration();

            admin.removeCache("distCache");

            org.infinispan.configuration.cache.Configuration cacheConfiguration =
                    new org.infinispan.configuration.cache.ConfigurationBuilder()
                            .clustering().cacheMode(CacheMode.DIST_SYNC)
                            .security().authorization().enable()
                            .encoding().key().mediaType("application/x-protostream")
                            .encoding().value().mediaType("application/x-protostream")
                            .build();
            admin.getOrCreateCache("distCache", cacheConfiguration);
        }
    }

    <K, V> void withRemoteCache(String cacheName, Consumer<RemoteCache<K, V>> consumer) {
        Configuration configuration =
                new ConfigurationBuilder()
                        .uri("hotrod://ispn-user:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222")
                        .build();


        try (RemoteCacheManager manager = new RemoteCacheManager(configuration)) {
            RemoteCache<K, V> cache = manager.getCache(cacheName);

            consumer.accept(cache);
        }
    }

    @Test
    public void instanceSharedTasks() {
        this.<String, Integer>withRemoteCache("distCache", cache -> {
            IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, i));

            assertThat(cache).hasSize(100);

            List<String> results1 =
                    cache.execute("SharedInstanceSummarizeServerTask", Map.of("message", "hello"));

            assertThat(results1)
                    .hasSize(3)
                    .containsOnly(
                            "hello, shared instance server task call count = 1, sum result = 5050",
                            "hello, shared instance server task call count = 1, sum result = 5050",
                            "hello, shared instance server task call count = 1, sum result = 5050"
                    );

            List<String> results2 =
                    cache.execute("SharedInstanceSummarizeServerTask", Map.of("message", "world"));

            assertThat(results2)
                    .hasSize(3)
                    .containsOnly(
                            "world, shared instance server task call count = 2, sum result = 5050",
                            "world, shared instance server task call count = 2, sum result = 5050",
                            "world, shared instance server task call count = 2, sum result = 5050"
                    );

            List<String> results3 =
                    cache.execute("SharedInstanceSummarizeServerTask", Map.of("message", "yeah"));

            assertThat(results3)
                    .hasSize(3)
                    .containsOnly(
                            "yeah, shared instance server task call count = 3, sum result = 5050",
                            "yeah, shared instance server task call count = 3, sum result = 5050",
                            "yeah, shared instance server task call count = 3, sum result = 5050"
                    );

            cache.clear();
        });
    }

    @Test
    public void instanceIsolatedTasks() {
        this.<String, Integer>withRemoteCache("distCache", cache -> {
            IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, i));

            assertThat(cache).hasSize(100);

            List<String> results1 =
                    cache.execute("IsolatedInstanceSummarizeServerTask", Map.of("message", "hello"));

            assertThat(results1)
                    .hasSize(3)
                    .containsOnly(
                            "hello, isolated instance server task call count = 1, sum result = 5050",
                            "hello, isolated instance server task call count = 1, sum result = 5050",
                            "hello, isolated instance server task call count = 1, sum result = 5050"
                    );

            List<String> results2 =
                    cache.execute("IsolatedInstanceSummarizeServerTask", Map.of("message", "world"));

            assertThat(results2)
                    .hasSize(3)
                    .containsOnly(
                            "world, isolated instance server task call count = 1, sum result = 5050",
                            "world, isolated instance server task call count = 1, sum result = 5050",
                            "world, isolated instance server task call count = 1, sum result = 5050"
                    );

            List<String> results3 =
                    cache.execute("IsolatedInstanceSummarizeServerTask", Map.of("message", "yeah"));

            assertThat(results3)
                    .hasSize(3)
                    .containsOnly(
                            "yeah, isolated instance server task call count = 1, sum result = 5050",
                            "yeah, isolated instance server task call count = 1, sum result = 5050",
                            "yeah, isolated instance server task call count = 1, sum result = 5050"
                    );

            cache.clear();
        });
    }
}

テストコードの内容を説明していきます。

こちらは、テストの最初にキャッシュの作成を行うようにしています。こちらの接続で使用しているユーザーは、管理ユーザーです。

    @BeforeAll
    static void createCache() {
        Configuration configuration =
                new ConfigurationBuilder()
                        .uri("hotrod://ispn-admin:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222")
                        .build();

        try (RemoteCacheManager manager = new RemoteCacheManager(configuration)) {
            RemoteCacheManagerAdmin admin = manager.administration();

            admin.removeCache("distCache");

            org.infinispan.configuration.cache.Configuration cacheConfiguration =
                    new org.infinispan.configuration.cache.ConfigurationBuilder()
                            .clustering().cacheMode(CacheMode.DIST_SYNC)
                            .security().authorization().enable()
                            .encoding().key().mediaType("application/x-protostream")
                            .encoding().value().mediaType("application/x-protostream")
                            .build();
            admin.getOrCreateCache("distCache", cacheConfiguration);
        }
    }

キャッシュの種類は、なんとなくDistributed Cacheです。

こちらは、テスト内で使用するRemoteCacheを作成するメソッド。接続にはアプリケーション用のユーザーを使用します。

    <K, V> void withRemoteCache(String cacheName, Consumer<RemoteCache<K, V>> consumer) {
        Configuration configuration =
                new ConfigurationBuilder()
                        .uri("hotrod://ispn-user:password@172.18.0.2:11222,172.18.0.3:11222,172.18.0.4:11222")
                        .build();


        try (RemoteCacheManager manager = new RemoteCacheManager(configuration)) {
            RemoteCache<K, V> cache = manager.getCache(cacheName);

            consumer.accept(cache);
        }
    }

あとはテストです。

最初は、TaskInstantiationModeTaskInstantiationMode#SHAREDにしたServer Taskに対する呼び出し。

    @Test
    public void instanceSharedTasks() {
        this.<String, Integer>withRemoteCache("distCache", cache -> {
            IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, i));

            assertThat(cache).hasSize(100);

            List<String> results1 =
                    cache.execute("SharedInstanceSummarizeServerTask", Map.of("message", "hello"));

            assertThat(results1)
                    .hasSize(3)
                    .containsOnly(
                            "hello, shared instance server task call count = 1, sum result = 5050",
                            "hello, shared instance server task call count = 1, sum result = 5050",
                            "hello, shared instance server task call count = 1, sum result = 5050"
                    );

            List<String> results2 =
                    cache.execute("SharedInstanceSummarizeServerTask", Map.of("message", "world"));

            assertThat(results2)
                    .hasSize(3)
                    .containsOnly(
                            "world, shared instance server task call count = 2, sum result = 5050",
                            "world, shared instance server task call count = 2, sum result = 5050",
                            "world, shared instance server task call count = 2, sum result = 5050"
                    );

            List<String> results3 =
                    cache.execute("SharedInstanceSummarizeServerTask", Map.of("message", "yeah"));

            assertThat(results3)
                    .hasSize(3)
                    .containsOnly(
                            "yeah, shared instance server task call count = 3, sum result = 5050",
                            "yeah, shared instance server task call count = 3, sum result = 5050",
                            "yeah, shared instance server task call count = 3, sum result = 5050"
                    );

            cache.clear();
        });
    }

呼び出しごとに、カウントアップされていますね。これは、Server Taskのインスタンスが使い回されているからです。

続いて、TaskInstantiationModeTaskInstantiationMode#ISOLATEDにしたServer Taskに対する呼び出し。

    @Test
    public void instanceIsolatedTasks() {
        this.<String, Integer>withRemoteCache("distCache", cache -> {
            IntStream.rangeClosed(1, 100).forEach(i -> cache.put("key" + i, i));

            assertThat(cache).hasSize(100);

            List<String> results1 =
                    cache.execute("IsolatedInstanceSummarizeServerTask", Map.of("message", "hello"));

            assertThat(results1)
                    .hasSize(3)
                    .containsOnly(
                            "hello, isolated instance server task call count = 1, sum result = 5050",
                            "hello, isolated instance server task call count = 1, sum result = 5050",
                            "hello, isolated instance server task call count = 1, sum result = 5050"
                    );

            List<String> results2 =
                    cache.execute("IsolatedInstanceSummarizeServerTask", Map.of("message", "world"));

            assertThat(results2)
                    .hasSize(3)
                    .containsOnly(
                            "world, isolated instance server task call count = 1, sum result = 5050",
                            "world, isolated instance server task call count = 1, sum result = 5050",
                            "world, isolated instance server task call count = 1, sum result = 5050"
                    );

            List<String> results3 =
                    cache.execute("IsolatedInstanceSummarizeServerTask", Map.of("message", "yeah"));

            assertThat(results3)
                    .hasSize(3)
                    .containsOnly(
                            "yeah, isolated instance server task call count = 1, sum result = 5050",
                            "yeah, isolated instance server task call count = 1, sum result = 5050",
                            "yeah, isolated instance server task call count = 1, sum result = 5050"
                    );

            cache.clear();
        });
    }

こちらは、カウントアップされません。呼び出しごとにServer Taskのインスタンスが作られているからですね。

というわけで、確認しておきたかったことはこれで済みました。

ソースコードを見てみる

今回の内容について、Infinispan側のソースコードを少し見ておきましょう。

まず、Server Taskのインスタンス化の制御についてはこちらです。

   public T run(TaskContext context) throws Exception {
      final ServerTask<T> t;
      if (task.getInstantiationMode() == TaskInstantiationMode.ISOLATED) {
         t = Util.getInstance(task.getClass());
      } else {
         t = task;
      }
      t.setTaskContext(context);
      if (log.isTraceEnabled()) {
         log.tracef("Executing task '%s' in '%s' mode using context %s", getName(), getInstantiationMode(), context);
      }
      return t.call();
   }

https://github.com/infinispan/infinispan/blob/14.0.1.Final/server/runtime/src/main/java/org/infinispan/server/tasks/ServerTaskWrapper.java#L33-L45

Server Taskの実行にあたり、Infinispan Serverに接続するユーザーにADMIN権限が必要とされていた点については、Pull Requestで
ソースコードが大量に変わっていたのであんまり深追いしませんでした…。

https://github.com/infinispan/infinispan/blob/14.0.1.Final/server/runtime/src/main/java/org/infinispan/server/tasks/ServerTaskEngine.java#L93

また、MarshallerをProtoStreamにして実行ノードを全ノードにした場合にMarshallingに失敗する問題は、以下の部分で
Collections#synchronizedListArrayListにしたことで解消されています。

https://github.com/infinispan/infinispan/blob/14.0.1.Final/server/runtime/src/main/java/org/infinispan/server/tasks/DistributedServerTaskRunner.java#L30

確認は、こんなところで。

まとめ

今回は、Infinispan ServerのServer Taskで、以前にいくつか困ったことがある問題が修正されているのを確認してみました。

だいぶ使いやすくなったのではないかな、と思います。

今回作成したソースコードは、こちらに置いています。

https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-task-instantiation