CLOVER🍀

That was when it all began.

Infinispan Server 12.1でProtoStreamでのMarshallingを使いつつ、Server Taskを実行する

これは、なにをしたくて書いたもの?

Infinispan ServerがServerNGと呼ばれる形態になってから、Server Taskを動かしたことがないなぁと思いまして。

MarshallingもデフォルトがJBoss MarshallingからProtoStreamになったことですし、このあたりをまとめて試してみようと
思います。

Server Taskを実行するなら、分散Streamかなとも思ったのですが、まずは分散処理とProtoStreamの組み合わせを確認した方が
いいかなと思ったので、それについては先にやっておきました。

Infinispan 12.1でのMarshalling/Encodingと分散処理と - CLOVER🍀

今回はServer Taskに絞って扱いたいと思います。

なお、ServerNGになる前に、Server Taskを動かした時のエントリはこちら。

InfinispanのRemote Task Executionを使って、Infinispan Server上でタスクを実行する - CLOVER🍀

Server Task

Infninspan Serverにおける、Server Taskについてのドキュメントはこちら。

Remotely Executing Server-Side Tasks

Infinispan CLIREST API、Hot Rod Clientから呼び出せるTaskを定義して、Infinispan Serverにデプロイできます。

Taskの特徴は、以下のようなところでしょうか。

  • Taskは、Javaで作成するか、JavaScriptなどのスクリプト言語で作成する
  • Taskを実行するのが、単一のNode(リクエストを受けたNode)か全Nodeかを選択できる
    • このモードを決めるのは、デプロイされるTask自身となる
  • Taskは、EmbeddedなInfinispanの機能を使って作成できる
  • Taskは、呼び出し元からパラメーターを受け取れる
  • デプロイ方法は、以下の2つ
    • Javaで作成した場合は、JARファイルをInfinispan Serverに配置して再起動
    • スクリプトの場合は、CLIで追加したりプログラムで追加したりできる

今回は、Javaで作成するTaskを扱います。

JavaでのServer Taskの作り方は、

Creating Server Tasks

ServerTaskインターフェースを実装したクラスを作成して

Server Tasks

そのクラス名を書いたMETA-INF/services/org.infinispan.tasks.ServerTaskファイルを含めたJARファイルを作成して、
Infinispan Serverのserver/libディレクトリにコピーします。

Deploying Server Tasks to Infinispan Servers

この時、Infinispan Serverを再起動する必要があります。

では、この流れでServer Taskを作成してデプロイしていきましょう。

環境

今回の環境は、こちらです。

$ java --version
openjdk 11.0.11 2021-04-20
OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04)
OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing)


$ mvn --version
Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d)
Maven home: $HOME.sdkman/candidates/maven/current
Java version: 11.0.11, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-77-generic", arch: "amd64", family: "unix"

Infinispanは12.1.6.Finalを使い、3 Node用意します。各Nodeは、171.17.0.2〜4で動作しているものとします。

お題

Taskは、こんなお題で作成しようと思います。

  • Cacheに格納するのは自作のクラスとし、Server Taskの戻り値も自作のクラスとする
    • MarshallingにはProtoStreamを使用する
  • 書籍の合計金額を計算するServer Taskを作成する
    • パラメーターも扱う

なお、周辺テーマ、ハマって切り落としたテーマはこのあたりです。

  • 認証、認可設定
  • ProtoStreamと実行モードの相性

では、進めていきましょう。

プロジェクト作成

今回は、Mavenのマルチプロジェクト構成で作成します。以下の3つのモジュールを持つ構成としましょう。
それぞれ、こんな内容にします。

  • entity … Cacheに格納するクラスや、Server Taskの戻り値に使うクラス、ProtoStreamを使ったMarshalling設定
  • task … Server Task
  • client … Server Taskを実行するテストコード

以降、それぞれ順次載せていきます。

親モジュール

全体の親モジュールの設定は、こちら。といっても、pom.xmlだけですが。

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.littlewings</groupId>
    <artifactId>remote-task-serverng-proto</artifactId>
    <packaging>pom</packaging>
    <version>0.0.1-SNAPSHOT</version>
    <modules>
        <module>entity</module>
        <module>task</module>
        <module>client</module>
    </modules>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>
</project>

entityモジュール

entityモジュールのpom.xmlは、こちら。

entity/pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>remote-task-serverng-proto</artifactId>
        <groupId>org.littlewings</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>entity</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.infinispan.protostream</groupId>
            <artifactId>protostream-processor</artifactId>
            <version>4.4.1.Final</version>
            <optional>true</optional>
        </dependency>
    </dependencies>
</project>

依存関係は、protostream-processorだけです。

書籍クラス。こちらは、Cacheに格納します。

entity/src/main/java/org/littlewings/infinispan/entity/Book.java

package org.littlewings.infinispan.entity;

import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoField;

public class Book {
    @ProtoField(number = 1)
    String isbn;

    @ProtoField(number = 2)
    String title;

    @ProtoField(number = 3, defaultValue = "0")
    int price;

    @ProtoFactory
    public static Book create(String isbn, String title, int price) {
        Book book = new Book();

        book.setIsbn(isbn);
        book.setTitle(title);
        book.setPrice(price);

        return book;
    }

    // getter/setterは省略
}

続いて、Server Taskの戻り値に使う結果クラス。こちらも、ProtoStreamでMarshallingする設定にしています。

entity/src/main/java/org/littlewings/infinispan/entity/Result.java

package org.littlewings.infinispan.entity;

import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoField;

public class Result {
    @ProtoField(number = 1, defaultValue = "0")
    int value;

    @ProtoFactory
    public static Result create(int value) {
        Result result = new Result();

        result.setValue(value);

        return result;
    }

    // getter/setterは省略
}

この2つのクラスのMarshallingの定義を生成するための、SerializationContextInitializerの定義。

entity/src/main/java/org/littlewings/infinispan/entity/EntitiesInitializer.java

package org.littlewings.infinispan.entity;

import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder;

@AutoProtoSchemaBuilder(
        includeClasses = {Book.class, Result.class},
        schemaFileName = "entities.proto",
        schemaFilePath = "proto",
        schemaPackageName = "org.littlewings.infinispan.entity"
)
public interface EntitiesInitializer extends SerializationContextInitializer {
}

これで、Cacheに格納するクラスとServer Taskの戻り値に使うクラスの定義は終わりです。

taskモジュール

続いて、taskモジュール。こちらのドキュメントに沿って作成します。

Server Tasks

pom.xmlは、こんな感じです。

task/pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>remote-task-serverng-proto</artifactId>
        <groupId>org.littlewings</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>task</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-tasks-api</artifactId>
            <version>12.1.6.Final</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.littlewings</groupId>
            <artifactId>entity</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
    </dependencies>
</project>

依存関係はinfinispan-tasks-apiと、先ほど作成したentityモジュールを追加しています。

そして、Server Taskの定義。

task/src/main/java/org/littlewings/infinispan/task/PriceSumTask.java

package org.littlewings.infinispan.task;

import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;

import org.infinispan.Cache;
import org.infinispan.tasks.ServerTask;
import org.infinispan.tasks.TaskContext;
import org.infinispan.tasks.TaskExecutionMode;
import org.jboss.logging.Logger;
import org.littlewings.infinispan.entity.Book;
import org.littlewings.infinispan.entity.Result;

public class PriceSumTask implements ServerTask<Result> {
    TaskContext taskContext;

    @Override
    public void setTaskContext(TaskContext taskContext) {
        Logger logger = Logger.getLogger(PriceSumTask.class);
        logger.infof("set task context");
        this.taskContext = taskContext;
    }

    @Override
    public Result call() throws Exception {
        Logger logger = Logger.getLogger(PriceSumTask.class);
        logger.infof("start task");

        Map<String, ?> parameters = taskContext.getParameters().orElse(Collections.emptyMap());

        int greaterThanPrice;

        if (parameters.containsKey("greaterThanPrice")) {
            greaterThanPrice = (Integer) parameters.get("greaterThanPrice");  // ALL_NODESの時は値はStringのみ
        } else {
            greaterThanPrice = 0;
        }

        Cache<String, Book> cache = (Cache<String, Book>) taskContext.getCache().orElseThrow();

        logger.infof("execution mode = %s, cache size = %d, greaterThan = %d", getExecutionMode(), cache.size(), greaterThanPrice);

        Result result = Result.create(
                cache
                        .values()
                        .stream()
                        .filter(book -> book.getPrice() > greaterThanPrice)
                        .map(book -> {
                            Logger l = Logger.getLogger(PriceSumTask.class);
                            l.infof("map entry = %s", book.getIsbn());
                            return book.getPrice();
                        })
                        .collect(() -> Collectors.summingInt(price -> price))
        );

        logger.infof("end task, result = %d", result.getValue());

        return result;
    }

    @Override
    public String getName() {
        return "price-sum-task";
    }

    @Override
    public TaskExecutionMode getExecutionMode() {
        return TaskExecutionMode.ONE_NODE;  // default, ONE_NODE
    }
}

ServerTaskインターフェースを実装し、戻り値はentityモジュールで作成したResultクラスとします。

public class PriceSumTask implements ServerTask<Result> {

setTaskContextメソッドでは、TaskContextを受け取ります。このTaskContextから、Cacheやパラメーターを取得します。

    @Override
    public void setTaskContext(TaskContext taskContext) {
        Logger logger = Logger.getLogger(PriceSumTask.class);
        logger.infof("set task context");
        this.taskContext = taskContext;
    }

Server Taskには名前を指定する必要があり、これはgetNameメソッドで定義します。この名前は、Client側からServer Taskを
呼び出す時に使用します。

    @Override
    public String getName() {
        return "price-sum-task";
    }

実行モード。このServer Taskが単一Nodeで実行されるのか、全Nodeで実行されるのかを定義します。このメソッドは
オーバーライド必須ではなく、デフォルトでは単一Nodeで実行されます。今回は明示しました。

    @Override
    public TaskExecutionMode getExecutionMode() {
        return TaskExecutionMode.ONE_NODE;  // default, ONE_NODE
    }

そして、最後はServer Taskの処理内容です。

    @Override
    public Result call() throws Exception {
        Logger logger = Logger.getLogger(PriceSumTask.class);
        logger.infof("start task");

        Map<String, ?> parameters = taskContext.getParameters().orElse(Collections.emptyMap());

        int greaterThanPrice;

        if (parameters.containsKey("greaterThanPrice")) {
            greaterThanPrice = (Integer) parameters.get("greaterThanPrice");  // ALL_NODESの時は値はStringのみ
        } else {
            greaterThanPrice = 0;
        }

        Cache<String, Book> cache = (Cache<String, Book>) taskContext.getCache().orElseThrow();

        logger.infof("execution mode = %s, cache size = %d, greaterThan = %d", getExecutionMode(), cache.size(), greaterThanPrice);

        Result result = Result.create(
                cache
                        .values()
                        .stream()
                        .filter(book -> book.getPrice() > greaterThanPrice)
                        .map(book -> {
                            Logger l = Logger.getLogger(PriceSumTask.class);
                            l.infof("map entry = %s", book.getIsbn());
                            return book.getPrice();
                        })
                        .collect(() -> Collectors.summingInt(price -> price))
        );

        logger.infof("end task, result = %d", result.getValue());

        return result;
    }

Cacheに格納されているBookクラスの価格を合算するのですが、パラメーターで合算する価格の下限を任意で指定できるように
しています。まあ、パラメーターを使う例として…。

あと、Server Task自身がSerializableを要求されないように、ログ出力しつつもローカル変数で処理が完結するように
しています。

作成したクラスの名前は、META-INF/services/org.infinispan.tasks.ServerTaskファイルに定義してService Loaderから
読み込まれるようにしておきます。

task/src/main/resources/META-INF/services/org.infinispan.tasks.ServerTask

org.littlewings.infinispan.task.PriceSumTask

こちらで自動生成してもよかった気はしますが、今回は明示的に作ることにしました。

META-INF/services generator -

ところで、Serializableについてですが。Server TaskをSerializableにした場合や、戻り値でJava標準のシリアライズを使う場合は
こちらに沿ってMarshallerをJavaSerializationMarshallerに設定するとともに、デシリアライズ可能なクラスのリストに
追加する必要があります。

Allowing deserialization of Java classes

このあたりも、こちらで触れています。

Infinispan 12.1でのMarshalling/Encodingと分散処理と - CLOVER🍀

今回は、Java標準のシリアライズの仕組みは使いません。

仮に設定したとすると、以下のようになります。
※Infinispan Server側の設定です

   <cache-container name="default" statistics="true">
      <transport cluster="${infinispan.cluster.name:cluster}" stack="${infinispan.cluster.stack:tcp}" node-name="${infinispan.node.name:}"/>
      <security>
         <authorization/>
      </security>
      <serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller">
         <allow-list>
            <regex>org\.littlewings\.infinispan\.task\..+</regex>
            <!-- その他、必要に応じて追加  -->
         </allow-list>
      </serialization>
   </cache-container>

Infinispan ServerにServer Taskをデプロイする

ここまでで、Infinispan Serverにデプロイするクラスができあがったので、パッケージングしてデプロイしましょう。

こちらのドキュメントに沿って進めます。

Deploying Server Tasks to Infinispan Servers

まずは、パッケージング。

$ mvn -pl task -am packag

作成されたJARファイル。

$ ll entity/target/entity-0.0.1-SNAPSHOT.jar task/target/task-0.0.1-SNAPSHOT.jar
-rw-rw-r-- 1 xxxxx xxxxx 9213  7月 10 21:32 entity/target/entity-0.0.1-SNAPSHOT.jar
-rw-rw-r-- 1 xxxxx xxxxx 8528  7月 10 21:32 task/target/task-0.0.1-SNAPSHOT.jar

こちらを、Infinispan Serverのserver/libディレクトリにコピーします。

$ cp /path/to/entity/target/entity-0.0.1-SNAPSHOT.jar /path/to/task/target/task-0.0.1-SNAPSHOT.jar server/lib

コピーしたら、Infinispan Serverを再起動してください。

Infinispan ServerがServer Taskを認識すると、以下のように「Extensionをロードした」というログが出力されます。

2021-07-10 12:47:27,529 INFO  (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'org.littlewings.infinispan.task.PriceSumTask'

この操作を、クラスタを構成するInfinispan Server分だけ行います。今回は、3 Nodeですね。

これで、Server Taskのデプロイは完了です。

ちなみに、server/libディレクトリにはREADME.mdファイルがあり、カスタムJARファイルを配置するための
ディレクトリであることが書かれています。

server/lib/README.txt

Place any custom jars you wish to add to the server classpath in this directory.

clientモジュールでの、テストコードの雛形

最後は、デプロイしたServer Taskを動作確認を行うためのテストコードを書いていきます。

まずはpom.xmlから。

client/pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>remote-task-serverng-proto</artifactId>
        <groupId>org.littlewings</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>client</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-client-hotrod</artifactId>
            <version>12.1.6.Final</version>
        </dependency>
        <dependency>
            <groupId>org.littlewings</groupId>
            <artifactId>entity</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

        <!-- for cache administration -->
        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-core</artifactId>
            <version>12.1.6.Final</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.7.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.7.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.20.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
        </plugins>
    </build>
</project>

Server Taskを実行するために必要なのはHot Rod Clientだけなので、infinispan-client-hotrodと今回使用するentityモジュールが
依存関係にあれば十分です。

なのですが、Cacheの定義はテストコード内で作成しようと思うので、infinispan-coreも追加しています。

あとはJUnit 5とAssertJをテスト用ライブラリとして指定。

テストコードの雛形は、こちらです。

client/src/test/java/org/littlewings/infinispan/client/TaskClientTest.java

package org.littlewings.infinispan.client;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.RemoteCacheManagerAdmin;
import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.configuration.cache.CacheMode;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.littlewings.infinispan.entity.Book;
import org.littlewings.infinispan.entity.Result;

import static org.assertj.core.api.Assertions.*;

public class TaskClientTest {
    List<Book> books =
            List.of(
                    Book.create("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5344),
                    Book.create("978-1849518222", "Infinispan Data Grid Platform", 3608),
                    Book.create("978-0359439379", "The Apache Ignite Book", 7686),
                    Book.create("978-1365732355", "High Performance in-memory computing with Apache Ignite", 6342),
                    Book.create("978-1789347531", "Apache Ignite Quick Start Guide: Distributed data caching and processing made easy", 3638),
                    Book.create("978-1785285332", "Getting Started with Hazelcast - Second Edition: Get acquainted with the highly scalable data grid, Hazelcast, and learn how to bring its powerful in-memory features into your application", 4209),
                    Book.create("978-1617295522", "Spark in Action, Second Edition: Covers Apache Spark 3 with Examples in Java, Python, and Scala", 6297),
                    Book.create("978-1484257807", "Beginning Apache Spark Using Azure Databricks: Unleashing Large Cluster Analytics in the Cloud", 4817),
                    Book.create("978-1788997829", "Apache Kafka Quick Start Guide: Leverage Apache Kafka 2.0 to simplify real-time data processing for distributed applications", 3516),
                    Book.create("978-1491936160", "Kafka: The Definitive Guide: Real-Time Data and Stream Processing at Scale", 4989)
            );

    // ここに、Cacheのセットアップやテストコードを書く
}

フィールドに定義しているのは、今回のテスト内で扱うデータですね。

では、ここからもうちょっと書いていきましょう。

Cacheを作成と認証設定

テストの最初に、Cacheを削除・作成するようにしましょう。以下のようなコードを、テストの実行前に1度実行します。

    @BeforeAll
    static void createCache() {
        Configuration configuration =
                new ConfigurationBuilder()
                        .uri("hotrod://ispn-admin:admin-password@172.17.0.2:11222,172.17.0.3:11222,172.17.0.4:11222")
                        .build();

        try (RemoteCacheManager manager = new RemoteCacheManager(configuration)) {
            RemoteCacheManagerAdmin admin = manager.administration();

            admin.removeCache("bookCache");

            org.infinispan.configuration.cache.Configuration cacheConfiguration =
                    new org.infinispan.configuration.cache.ConfigurationBuilder()
                            .clustering().cacheMode(CacheMode.DIST_SYNC)
                            .security().authorization().enable()
                            .encoding().key().mediaType("application/x-protostream")
                            .encoding().value().mediaType("application/x-protostream")
                            .build();
            admin.getOrCreateCache("bookCache", cacheConfiguration);
        }
    }

RemoteCacheManagerAdminを使って、Cacheをプログラムで定義します。このために、infinispan-coreを依存関係に
追加しました。

Hot Rodで接続する際のURIにユーザーのIDとパスワードを指定していますが、これは次に作成します。
Infinispan 12.1.0.Finalからデフォルトで認証が必要になっているようなので、この挙動に従います。
※これまではデフォルトでは認証は無効でした

Infinispan 12.1.0.Final

作成しているCacheは、以下の定義です。

エンコーディングは今回必須ではないのですが、指定しておかないとInfinispan Serverから警告されるので、明示的しておくことに
します。

なお、Cacheは最初に削除して、それから再定義する形を取っています。

認証・認可設定

認証、認可が必要になるので、Infinispan Server側に設定を行います。
ここで紹介する操作は、クラスタを構成する各Infinispan Serverそれぞれに行います。

まずは、ユーザーを作成します。

$ bin/cli.sh user create ispn-admin -p admin-password -g admin

ユーザーは、adminグループに所属するようにしました。

Adding Credentials

次に、認可設定を行います。

Server Taskを実行するためには、EXEC権限が必要です。

Cache permissions

認可設定は簡単のため、ClusterRoleMapperを使用しましょう。

Cluster role mappers

ClusterRoleMapperはデフォルトの認可設定で、ユーザーが所属するグループ名に合わせて権限が決まります。

User Roles and Permissions

デフォルトの認可設定なので以下の状態ですでにClusterRoleMapperが使われることになるのですが、

   <cache-container name="default" statistics="true">
      <transport cluster="${infinispan.cluster.name:cluster}" stack="${infinispan.cluster.stack:tcp}" node-name="${infinispan.node.name:}"/>
      <security>
         <authorization/>
      </security>
   </cache-container>

明示的に設定するなら、以下のようになります。

   <cache-container name="default" statistics="true">
      <transport cluster="${infinispan.cluster.name:cluster}" stack="${infinispan.cluster.stack:tcp}" node-name="${infinispan.node.name:}"/>
      <security>
         <authorization>
            <cluster-role-mapper/>
         </authorization>
      </security>
   </cache-container>

今回は、ALL権限を持ったadminグループを使います。

実際のCacheの定義

これで、こちらのコードで定義したCacheを実際に作成すると

    @BeforeAll
    static void createCache() {
        Configuration configuration =
                new ConfigurationBuilder()
                        .uri("hotrod://ispn-admin:admin-password@172.17.0.2:11222,172.17.0.3:11222,172.17.0.4:11222")
                        .build();

        try (RemoteCacheManager manager = new RemoteCacheManager(configuration)) {
            RemoteCacheManagerAdmin admin = manager.administration();

            admin.removeCache("bookCache");

            org.infinispan.configuration.cache.Configuration cacheConfiguration =
                    new org.infinispan.configuration.cache.ConfigurationBuilder()
                            .clustering().cacheMode(CacheMode.DIST_SYNC)
                            .security().authorization().enable()
                            .encoding().key().mediaType("application/x-protostream")
                            .encoding().value().mediaType("application/x-protostream")
                            .build();
            admin.getOrCreateCache("bookCache", cacheConfiguration);
        }
    }

このようになります。

server/data/caches.xml

<?xml version="1.0"?>
<infinispan xmlns="urn:infinispan:config:12.1">
    <cache-container>
        <distributed-cache mode="SYNC" remote-timeout="17500" name="bookCache" statistics="true">
            <encoding>
                <key media-type="application/x-protostream"/>
                <value media-type="application/x-protostream"/>
            </encoding>
            <locking concurrency-level="1000" acquire-timeout="15000" striping="false"/>
            <security>
                <authorization enabled="true"/>
            </security>
            <state-transfer timeout="60000"/>
        </distributed-cache>
    </cache-container>
</infinispan>

テストコード内でInfinispan Serverに接続するためのメソッド定義

テストコード内では、Infinispan Server内に接続する必要があります。この定義は、以下のようにまとめました。

    <K, V> void withRemoteCache(String cacheName, Consumer<RemoteCache<K, V>> func) {
        Configuration configuration =
                new ConfigurationBuilder()
                        .uri("hotrod://ispn-admin:admin-password@172.17.0.2:11222,172.17.0.3:11222,172.17.0.4:11222"
                                + "?context-initializers=org.littlewings.infinispan.entity.EntitiesInitializerImpl")
                        .build();

        try (RemoteCacheManager manager = new RemoteCacheManager(configuration)) {
            RemoteCache<K, V> cache = manager.getCache(cacheName);

            func.accept(cache);
        }
    }

context-initializersで、entityモジュールで自動生成されたSerializationContextInitializerインターフェースの実装クラスを
指定しているところがポイントですね。

Server Taskを実行する

それでは、Server Taskを呼び出してみます。

パラメーターなし、あり、それぞれで、以下のようなテストを作成しました。

    @Test
    public void singleNodeTask() {
        this.<String, Book>withRemoteCache("bookCache", cache -> {
            books.forEach(b -> cache.put(b.getIsbn(), b));

            Result result = cache.execute("price-sum-task", Collections.emptyMap());

            assertThat(result.getValue()).isEqualTo(50446);

            cache.clear();
            assertThat(cache).isEmpty();
        });
    }

    @Test
    public void singleNodeTaskWithParameters() {
        this.<String, Book>withRemoteCache("bookCache", cache -> {
            books.forEach(b -> cache.put(b.getIsbn(), b));

            Map<String, Object> parameters = Map.of("greaterThanPrice", 5000);

            Result result = cache.execute("price-sum-task", parameters);

            assertThat(result.getValue()).isEqualTo(25669);

            cache.clear();
            assertThat(cache).isEmpty();
        });
    }

Server Taskの呼び出し自体はとても簡単で、RemoteCache#executeで呼び出すServer Task名と、Server Taskに必要な
パラメーターをMapで指定します。

            // パラメーターなし
            Result result = cache.execute("price-sum-task", Collections.emptyMap());


            // パラメーターあり
            Map<String, Object> parameters = Map.of("greaterThanPrice", 5000);

            Result result = cache.execute("price-sum-task", parameters);

実行すると、こんな感じでInfinispan Server上で分散実行されていることが確認できます。

パラメーターなしの場合。

## Infinispan Server (172.17.0.4)

2021-07-10 14:06:09,434 INFO  (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] set task context
2021-07-10 14:06:09,435 INFO  (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] start task
2021-07-10 14:06:09,439 INFO  (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] execution mode = ONE_NODE, cache size = 10, greaterThan = 0
2021-07-10 14:06:09,441 INFO  (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1785285332
2021-07-10 14:06:09,441 INFO  (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1789347531
2021-07-10 14:06:09,442 INFO  (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1782169970
2021-07-10 14:06:09,443 INFO  (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-0359439379
2021-07-10 14:06:09,443 INFO  (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1484257807
2021-07-10 14:06:09,443 INFO  (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1491936160
2021-07-10 14:06:09,443 INFO  (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1365732355
2021-07-10 14:06:09,444 INFO  (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1849518222
2021-07-10 14:06:09,444 INFO  (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] end task, result = 50446


## Infinspan Server(172.17.0.3)

2021-07-10 14:06:09,442 INFO  (jgroups-22,129663e80859-52991) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1617295522
2021-07-10 14:06:09,442 INFO  (jgroups-22,129663e80859-52991) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1788997829

パラメーターありの場合。

## Infinispan Server(172.17.0.2)

2021-07-10 14:08:58,613 INFO  (SINGLE_PORT-ServerIO-3-1) [org.littlewings.infinispan.task.PriceSumTask] set task context
2021-07-10 14:08:58,614 INFO  (SINGLE_PORT-ServerIO-3-1) [org.littlewings.infinispan.task.PriceSumTask] start task
2021-07-10 14:08:58,615 INFO  (SINGLE_PORT-ServerIO-3-1) [org.littlewings.infinispan.task.PriceSumTask] execution mode = ONE_NODE, cache size = 10, greaterThan = 5000
2021-07-10 14:08:58,616 INFO  (SINGLE_PORT-ServerIO-3-1) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1617295522
2021-07-10 14:08:58,617 INFO  (SINGLE_PORT-ServerIO-3-1) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1782169970
2021-07-10 14:08:58,617 INFO  (SINGLE_PORT-ServerIO-3-1) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-0359439379
2021-07-10 14:08:58,618 INFO  (SINGLE_PORT-ServerIO-3-1) [org.littlewings.infinispan.task.PriceSumTask] end task, result = 25669


## Infinispan Server(172.17.0.3)

2021-07-10 14:08:58,617 INFO  (jgroups-25,129663e80859-52991) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1365732355

Server Taskの実行モードこそONE_NODE(単一Node)ですが、Server Taskから起動される処理内で分散処理などを行い、
クラスタ全体に跨る操作をすることは可能です。

これで、デプロイしたServer Taskが動作していることが確認できました、と

オマケ

ここでは、いくつか内部を追ってみたことを書こうかなと思います。

Server Taskのインスタンスが生成されるタイミングと個数は?

Server Taskは、各Infinispan Serverで単一のインスタンスになります。

Infinispan Serverの起動時にExtentionとしてService Loaderの仕組みでロードされ、インスタンス化されます。

https://github.com/infinispan/infinispan/blob/12.1.6.Final/server/runtime/src/main/java/org/infinispan/server/Extensions.java#L85-L90

ところで、単一のインスタンスがゆえに、TaskContextをsetterで受け取ると並行性が気になるところですが、
これはServer Task側ではどう対策するのがいいんでしょうね?ThreadLocalでしょうか?

あるServer Taskの実行は、リクエストがキューで管理されていたり、同期化されていたりするわけではないので…。

Server Taskの実行の様子は?

Server Taskの実行は、Infinispan Serverでリクエストを受けるとまずはTaskRequestProcessorによってハンドリングされます。

https://github.com/infinispan/infinispan/blob/12.1.6.Final/server/hotrod/src/main/java/org/infinispan/server/hotrod/TaskRequestProcessor.java#L39

https://github.com/infinispan/infinispan/blob/12.1.6.Final/tasks/manager/src/main/java/org/infinispan/tasks/impl/TaskManagerImpl.java#L92

そして、ServerTaskEngineによって実行モードが単一Nodeか全NodeかでServerTaskRunnerが決められ、実際のServer Taskを
起動します。

https://github.com/infinispan/infinispan/blob/12.1.6.Final/server/runtime/src/main/java/org/infinispan/server/tasks/ServerTaskEngine.java#L68-L82

単一Nodeの場合のServerTaskRunnerはこちら。

https://github.com/infinispan/infinispan/blob/12.1.6.Final/server/runtime/src/main/java/org/infinispan/server/tasks/LocalServerTaskRunner.java

実際のServer TaskのインスタンスをラップしたServerTaskWrapper越しに、Server Taskを呼び出すだけです。

https://github.com/infinispan/infinispan/blob/12.1.6.Final/server/runtime/src/main/java/org/infinispan/server/tasks/ServerTaskWrapper.java

ここまででは動作させていませんが、全Nodeの場合はこちらのクラスが使われます。
※なぜ動作させなかったのは、後述します

https://github.com/infinispan/infinispan/blob/12.1.6.Final/server/runtime/src/main/java/org/infinispan/server/tasks/DistributedServerTaskRunner.java

そして、Cluster Executorの仕組みで全Node上でServer Taskを実行します。

Cluster Executor

https://github.com/infinispan/infinispan/blob/12.1.6.Final/server/runtime/src/main/java/org/infinispan/server/tasks/DistributedServerTaskRunner.java#L28

https://github.com/infinispan/infinispan/blob/12.1.6.Final/core/src/main/java/org/infinispan/manager/impl/LocalClusterExecutor.java#L94

https://github.com/infinispan/infinispan/blob/12.1.6.Final/server/runtime/src/main/java/org/infinispan/server/tasks/DistributedServerTask.java

まとめ?

ServerNGとなったInfinispan Serverで、ProtoStreamでMarshallingしつつServer Taskを使ってみました。

動かすまで一苦労だったのですが、Marshalling、認証・認可などいろいろ勉強になりました…。

この後に書いている「ハマったこと?」で記載しているソースコードも含めて、以下に置いています。

https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-task-serverng-proto

ハマったこと?

ここまでの動作確認と、結論を出すにはいろいろとハマったことがありまして。

そちらを紹介したいと思います。

ADMIN権限を持っていないと、Server Taskを実行できない

ところでClusterRoleMapperの説明を見ていると、EXEC権限を含むapplicationグループにユーザーを作れば良さそうですが、
どうやらそれでは権限が足りないようで…。

認可設定が有効な場合、Server Taskの実行時に権限の有無を確認するのですが

https://github.com/infinispan/infinispan/blob/12.1.6.Final/server/runtime/src/main/java/org/infinispan/server/tasks/ServerTaskEngine.java#L95

この時に、権限確認の操作を行うためにADMIN権限が必要になっていて、事実上今はADMIN権限がないとServer Taskを
実行できなさそうです。

https://github.com/infinispan/infinispan/blob/12.1.6.Final/core/src/main/java/org/infinispan/security/impl/SecureCacheImpl.java#L564

認可を有効にすると、CacheはSecureCacheImplでラップされるようです。

https://github.com/infinispan/infinispan/blob/12.1.6.Final/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java#L690

このため、今はこうなります、と。

仮に、applicationグループのユーザーでServer Taskを実行した場合は、以下のように例外がスローされます。

7月 10, 2021 10:53:59 午後 org.infinispan.client.hotrod.impl.protocol.Codec20 checkForErrorsInResponseStatus
WARN: ISPN004005: Error received from the server: java.lang.SecurityException: ISPN000287: Unauthorized access: subject 'Subject with principal(s): [ispn-app, RolePrincipal{name='application'}, InetAddressPrincipal [address=172.17.0.1/172.17.0.1]]' lacks 'ADMIN' permission

org.infinispan.client.hotrod.exceptions.HotRodClientException:Request for messageId=61 returned server error (status=0x85): java.lang.SecurityException: ISPN000287: Unauthorized access: subject 'Subject with principal(s): [ispn-app, RolePrincipal{name='application'}, InetAddressPrincipal [address=172.17.0.1/172.17.0.1]]' lacks 'ADMIN' permission

そのため、今回は全権限(ALL)を持ったadminグループのユーザーを使いました。

実行モードを全Nodeにできない

今回、Server Taskの実行モードを単一Node(ONE_NODE)としましたが、全Node(ALL_NODES)とすることが
できませんでした。

これは、Marshallingの仕組みをProtoStreamにしている場合、です。

理由はこちらで、実行モードをALL_NODESとすると、Hot Rod Clientには結果がList(各Nodeでの実行結果が含まれる)と
なるのですが、これがCollections#synchronizedListで作成されます。

https://github.com/infinispan/infinispan/blob/12.1.6.Final/server/runtime/src/main/java/org/infinispan/server/tasks/DistributedServerTaskRunner.java#L30

ProtoStreamには、JDKで用意されているクラスのいくつかにはデフォルトでMarshallerを用意しています。

ProtoStream types

https://github.com/infinispan/protostream/tree/4.4.1.Final/types/src/main/java/org/infinispan/protostream/types/java

ですが、この中にはCollections#synchronizedListに対応するものがないため、Server Taskの実行完了時にMarshallingできずに
失敗します。

まあ、全体を跨る処理は単一Nodeでの実行で分散処理をすることになる気がするので、そんなに困らない気はするの
ですが…どうでしょう?

ProtoStreamでCollections#synchronizedListをMarshallingできないことは、テストコードで簡単に確認できます。

以下が、Collections#synchronizedListArrayListをそれぞれProtoStreamでMarshallingするテストコードです。

    @Test
    public void failMarshallingSynchronizedList() {
        this.<String, Book>withRemoteCache("bookCache", cache -> {
            Marshaller marshaller = cache.getRemoteCacheManager().getMarshaller();

            List<String> list = Collections.synchronizedList(new ArrayList<>());
            list.add("Hello");
            list.add("World");

            assertThatThrownBy(() -> marshaller.objectToByteBuffer(list))
                    .hasMessageContaining("No marshaller registered for object of Java type java.util.Collections$SynchronizedRandomAccessList")
                    .isInstanceOf(IllegalArgumentException.class);
        });
    }

    @Test
    public void marshallingArrayList() {
        this.<String, Book>withRemoteCache("bookCache", cache -> {
            Marshaller marshaller = cache.getRemoteCacheManager().getMarshaller();

            List<String> list = new ArrayList<>();
            list.add("Hello");
            list.add("World");

            try {
                byte[] binary = marshaller.objectToByteBuffer(list);
                assertThat(binary).hasSize(67);

                assertThat((List<String>) marshaller.objectFromByteBuffer(binary))
                        .isEqualTo(list)
                        .isInstanceOf(ArrayList.class)
                        .containsExactly("Hello", "World");
            } catch (IOException | InterruptedException | ClassNotFoundException e) {
                fail("fail", e);
            }
        });
    }

ArrayListの方はうまくいきますが、Collections#synchronizedListの方は失敗します。

これが、Infinispan ServerにデプロイしたServer Taskの実行時にも発生します。

なお、全Node(ALL_NODES)の実行モードで作成したServer Taskはデプロイ自体は可能ですし、Marshallingの仕組みを
Java標準のシリアライズにすれば実行可能になります。

おそらく、Cacheに保存するデータはProtoStreamでエンコーディングし、Server Taskの実行時にHot Rod Clientと
Infinispan Serverの間のやり取りをJava標準のシリアライズの仕組みとすれば、データの保存はProtoStreamのままでも
良いかもしれません。が、そこまでは試していません。

Hot Rod DataFormat API

ちなみに、ServerNGとなる前まではCollections#synchronizedListではなくArrayListだったようです。

https://github.com/infinispan/infinispan/blob/8.2.12.Final/server/integration/infinispan/src/main/java/org/infinispan/server/infinispan/task/DistributedServerTaskRunner.java

実行モードを全Nodeにすると、パラメーターがStringに強制変換される

それでもと、全Node(ALL_NODES)で動かしてみたのですが、ちょっと驚くことにこの場合はパラメーターが
Stringに強制変換されます。

以下が、単一Node向けに作成したServer Taskの、全Node(ALL_NODES)版です。

task/src/main/java/org/littlewings/infinispan/task/PriceSumAllNodesTask.java

package org.littlewings.infinispan.task;

import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;

import org.infinispan.Cache;
import org.infinispan.tasks.ServerTask;
import org.infinispan.tasks.TaskContext;
import org.infinispan.tasks.TaskExecutionMode;
import org.jboss.logging.Logger;
import org.littlewings.infinispan.entity.Book;
import org.littlewings.infinispan.entity.Result;

public class PriceSumAllNodesTask implements ServerTask<Result> {
    TaskContext taskContext;

    @Override
    public void setTaskContext(TaskContext taskContext) {
        Logger logger = Logger.getLogger(PriceSumAllNodesTask.class);
        logger.infof("set task context");
        this.taskContext = taskContext;
    }

    @Override
    public Result call() throws Exception {
        Logger logger = Logger.getLogger(PriceSumAllNodesTask.class);
        logger.infof("start task");

        Map<String, ?> parameters = taskContext.getParameters().orElse(Collections.emptyMap());

        int greaterThanPrice;

        if (parameters.containsKey("greaterThanPrice")) {
            greaterThanPrice = Integer.parseInt((String) parameters.get("greaterThanPrice"));  // ALL_NODESの時は値はStringのみ
        } else {
            greaterThanPrice = 0;
        }

        Cache<String, Book> cache = (Cache<String, Book>) taskContext.getCache().orElseThrow();

        logger.infof("execution mode = %s, cache size = %d, greaterThan = %d", getExecutionMode(), cache.size(), greaterThanPrice);

        Result result = Result.create(
                cache
                        .values()
                        .stream()
                        .filter(book -> book.getPrice() > greaterThanPrice)
                        .map(book -> {
                            Logger l = Logger.getLogger(PriceSumAllNodesTask.class);
                            l.infof("map entry = %s", book.getIsbn());
                            return book.getPrice();
                        })
                        .collect(() -> Collectors.summingInt(price -> price))
        );

        logger.infof("end task, result = %d", result.getValue());

        return result;
    }

    @Override
    public String getName() {
        return "price-sum-all-nodes-task";
    }

    @Override
    public TaskExecutionMode getExecutionMode() {
        return TaskExecutionMode.ALL_NODES;  // default, ONE_NODE
    }
}

Service Loaderの仕組みでロードするファイルは、こうなります。

task/src/main/resources/META-INF/services/org.infinispan.tasks.ServerTask

org.littlewings.infinispan.task.PriceSumTask
org.littlewings.infinispan.task.PriceSumAllNodesTask

実行モードは当然違うのですが

    @Override
    public TaskExecutionMode getExecutionMode() {
        return TaskExecutionMode.ALL_NODES;  // default, ONE_NODE
    }

その他に変わった差異として、パラメーターがStringになっています。

        if (parameters.containsKey("greaterThanPrice")) {
            greaterThanPrice = Integer.parseInt((String) parameters.get("greaterThanPrice"));  // ALL_NODESの時は値はStringのみ
        } else {
            greaterThanPrice = 0;
        }

これはどうしたことだろう?と思ったのですが、意図的に(?)Object#toStringしていますね。

https://github.com/infinispan/infinispan/blob/12.1.6.Final/server/runtime/src/main/java/org/infinispan/server/tasks/DistributedServerTaskRunner.java#L37-L38

これは、パラメーターをProtoStreamでMarshallingする関係上、型を決めないといけなかったのかな?と思うのですが、
どうでしょうか。

https://github.com/infinispan/infinispan/blob/12.1.6.Final/server/runtime/src/main/java/org/infinispan/server/tasks/TaskParameter.java

なお、単一Nodeの場合は、このようなことは発生しません。

https://github.com/infinispan/infinispan/blob/12.1.6.Final/server/runtime/src/main/java/org/infinispan/server/tasks/LocalServerTaskRunner.java

Collections#synchronizedListがMarshallingできないことも含めて、全Nodeでの実行モードで動かした時のテストコードはこちらです。

    @Test
    public void allNodesTask() {
        this.<String, Book>withRemoteCache("bookCache", cache -> {
            books.forEach(b -> cache.put(b.getIsbn(), b));

            assertThatThrownBy(() -> cache.execute("price-sum-all-nodes-task", Collections.emptyMap()))
                    .isInstanceOf(HotRodClientException.class)
                    .hasMessage("org.infinispan.commons.marshall.MarshallingException: ISPN000615: Unable to unmarshall 'java.util.Collections$SynchronizedRandomAccessList' as a marshaller is not present in the user or global SerializationContext");

            cache.clear();
            assertThat(cache).isEmpty();
        });
    }

    @Test
    public void allNodesTaskWithParameters() {
        this.<String, Book>withRemoteCache("bookCache", cache -> {
            books.forEach(b -> cache.put(b.getIsbn(), b));

            Map<String, Object> parameters = Map.of("greaterThanPrice", 5000);

            assertThatThrownBy(() -> cache.execute("price-sum-all-nodes-task", parameters))
                    .isInstanceOf(HotRodClientException.class)
                    .hasMessage("org.infinispan.commons.marshall.MarshallingException: ISPN000615: Unable to unmarshall 'java.util.Collections$SynchronizedRandomAccessList' as a marshaller is not present in the user or global SerializationContext");

            cache.clear();
            assertThat(cache).isEmpty();
        });
    }

ハマったことと、その記録でした…。