これは、なにをしたくて書いたもの?
Infinispan ServerがServerNGと呼ばれる形態になってから、Server Taskを動かしたことがないなぁと思いまして。
MarshallingもデフォルトがJBoss MarshallingからProtoStreamになったことですし、このあたりをまとめて試してみようと
思います。
Server Taskを実行するなら、分散Streamかなとも思ったのですが、まずは分散処理とProtoStreamの組み合わせを確認した方が
いいかなと思ったので、それについては先にやっておきました。
Infinispan 12.1でのMarshalling/Encodingと分散処理と - CLOVER🍀
今回はServer Taskに絞って扱いたいと思います。
なお、ServerNGになる前に、Server Taskを動かした時のエントリはこちら。
InfinispanのRemote Task Executionを使って、Infinispan Server上でタスクを実行する - CLOVER🍀
Server Task
Infninspan Serverにおける、Server Taskについてのドキュメントはこちら。
Remotely Executing Server-Side Tasks
Infinispan CLI、REST API、Hot Rod Clientから呼び出せるTaskを定義して、Infinispan Serverにデプロイできます。
Taskの特徴は、以下のようなところでしょうか。
- Taskは、Javaで作成するか、JavaScriptなどのスクリプト言語で作成する
- Taskを実行するのが、単一のNode(リクエストを受けたNode)か全Nodeかを選択できる
- このモードを決めるのは、デプロイされるTask自身となる
- Taskは、EmbeddedなInfinispanの機能を使って作成できる
- Taskは、呼び出し元からパラメーターを受け取れる
- デプロイ方法は、以下の2つ
今回は、Javaで作成するTaskを扱います。
JavaでのServer Taskの作り方は、
ServerTask
インターフェースを実装したクラスを作成して
そのクラス名を書いたMETA-INF/services/org.infinispan.tasks.ServerTask
ファイルを含めたJARファイルを作成して、
Infinispan Serverのserver/lib
ディレクトリにコピーします。
Deploying Server Tasks to Infinispan Servers
この時、Infinispan Serverを再起動する必要があります。
では、この流れでServer Taskを作成してデプロイしていきましょう。
環境
今回の環境は、こちらです。
$ java --version openjdk 11.0.11 2021-04-20 OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04) OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing) $ mvn --version Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 11.0.11, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-77-generic", arch: "amd64", family: "unix"
Infinispanは12.1.6.Finalを使い、3 Node用意します。各Nodeは、171.17.0.2〜4で動作しているものとします。
お題
Taskは、こんなお題で作成しようと思います。
- Cacheに格納するのは自作のクラスとし、Server Taskの戻り値も自作のクラスとする
- MarshallingにはProtoStreamを使用する
- 書籍の合計金額を計算するServer Taskを作成する
- パラメーターも扱う
なお、周辺テーマ、ハマって切り落としたテーマはこのあたりです。
- 認証、認可設定
- ProtoStreamと実行モードの相性
では、進めていきましょう。
プロジェクト作成
今回は、Mavenのマルチプロジェクト構成で作成します。以下の3つのモジュールを持つ構成としましょう。
それぞれ、こんな内容にします。
- entity … Cacheに格納するクラスや、Server Taskの戻り値に使うクラス、ProtoStreamを使ったMarshalling設定
- task … Server Task
- client … Server Taskを実行するテストコード
以降、それぞれ順次載せていきます。
親モジュール
全体の親モジュールの設定は、こちら。といっても、pom.xml
だけですが。
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.littlewings</groupId> <artifactId>remote-task-serverng-proto</artifactId> <packaging>pom</packaging> <version>0.0.1-SNAPSHOT</version> <modules> <module>entity</module> <module>task</module> <module>client</module> </modules> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> </project>
entityモジュール
entityモジュールのpom.xml
は、こちら。
entity/pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>remote-task-serverng-proto</artifactId> <groupId>org.littlewings</groupId> <version>0.0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>entity</artifactId> <dependencies> <dependency> <groupId>org.infinispan.protostream</groupId> <artifactId>protostream-processor</artifactId> <version>4.4.1.Final</version> <optional>true</optional> </dependency> </dependencies> </project>
依存関係は、protostream-processor
だけです。
書籍クラス。こちらは、Cacheに格納します。
entity/src/main/java/org/littlewings/infinispan/entity/Book.java
package org.littlewings.infinispan.entity; import org.infinispan.protostream.annotations.ProtoFactory; import org.infinispan.protostream.annotations.ProtoField; public class Book { @ProtoField(number = 1) String isbn; @ProtoField(number = 2) String title; @ProtoField(number = 3, defaultValue = "0") int price; @ProtoFactory public static Book create(String isbn, String title, int price) { Book book = new Book(); book.setIsbn(isbn); book.setTitle(title); book.setPrice(price); return book; } // getter/setterは省略 }
続いて、Server Taskの戻り値に使う結果クラス。こちらも、ProtoStreamでMarshallingする設定にしています。
entity/src/main/java/org/littlewings/infinispan/entity/Result.java
package org.littlewings.infinispan.entity; import org.infinispan.protostream.annotations.ProtoFactory; import org.infinispan.protostream.annotations.ProtoField; public class Result { @ProtoField(number = 1, defaultValue = "0") int value; @ProtoFactory public static Result create(int value) { Result result = new Result(); result.setValue(value); return result; } // getter/setterは省略 }
この2つのクラスのMarshallingの定義を生成するための、SerializationContextInitializer
の定義。
entity/src/main/java/org/littlewings/infinispan/entity/EntitiesInitializer.java
package org.littlewings.infinispan.entity; import org.infinispan.protostream.SerializationContextInitializer; import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder; @AutoProtoSchemaBuilder( includeClasses = {Book.class, Result.class}, schemaFileName = "entities.proto", schemaFilePath = "proto", schemaPackageName = "org.littlewings.infinispan.entity" ) public interface EntitiesInitializer extends SerializationContextInitializer { }
これで、Cacheに格納するクラスとServer Taskの戻り値に使うクラスの定義は終わりです。
taskモジュール
続いて、taskモジュール。こちらのドキュメントに沿って作成します。
pom.xml
は、こんな感じです。
task/pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>remote-task-serverng-proto</artifactId> <groupId>org.littlewings</groupId> <version>0.0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>task</artifactId> <dependencies> <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-tasks-api</artifactId> <version>12.1.6.Final</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.littlewings</groupId> <artifactId>entity</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> </dependencies> </project>
依存関係はinfinispan-tasks-api
と、先ほど作成したentityモジュールを追加しています。
そして、Server Taskの定義。
task/src/main/java/org/littlewings/infinispan/task/PriceSumTask.java
package org.littlewings.infinispan.task; import java.util.Collections; import java.util.Map; import java.util.stream.Collectors; import org.infinispan.Cache; import org.infinispan.tasks.ServerTask; import org.infinispan.tasks.TaskContext; import org.infinispan.tasks.TaskExecutionMode; import org.jboss.logging.Logger; import org.littlewings.infinispan.entity.Book; import org.littlewings.infinispan.entity.Result; public class PriceSumTask implements ServerTask<Result> { TaskContext taskContext; @Override public void setTaskContext(TaskContext taskContext) { Logger logger = Logger.getLogger(PriceSumTask.class); logger.infof("set task context"); this.taskContext = taskContext; } @Override public Result call() throws Exception { Logger logger = Logger.getLogger(PriceSumTask.class); logger.infof("start task"); Map<String, ?> parameters = taskContext.getParameters().orElse(Collections.emptyMap()); int greaterThanPrice; if (parameters.containsKey("greaterThanPrice")) { greaterThanPrice = (Integer) parameters.get("greaterThanPrice"); // ALL_NODESの時は値はStringのみ } else { greaterThanPrice = 0; } Cache<String, Book> cache = (Cache<String, Book>) taskContext.getCache().orElseThrow(); logger.infof("execution mode = %s, cache size = %d, greaterThan = %d", getExecutionMode(), cache.size(), greaterThanPrice); Result result = Result.create( cache .values() .stream() .filter(book -> book.getPrice() > greaterThanPrice) .map(book -> { Logger l = Logger.getLogger(PriceSumTask.class); l.infof("map entry = %s", book.getIsbn()); return book.getPrice(); }) .collect(() -> Collectors.summingInt(price -> price)) ); logger.infof("end task, result = %d", result.getValue()); return result; } @Override public String getName() { return "price-sum-task"; } @Override public TaskExecutionMode getExecutionMode() { return TaskExecutionMode.ONE_NODE; // default, ONE_NODE } }
ServerTask
インターフェースを実装し、戻り値はentityモジュールで作成したResult
クラスとします。
public class PriceSumTask implements ServerTask<Result> {
setTaskContext
メソッドでは、TaskContext
を受け取ります。このTaskContext
から、Cacheやパラメーターを取得します。
@Override public void setTaskContext(TaskContext taskContext) { Logger logger = Logger.getLogger(PriceSumTask.class); logger.infof("set task context"); this.taskContext = taskContext; }
Server Taskには名前を指定する必要があり、これはgetName
メソッドで定義します。この名前は、Client側からServer Taskを
呼び出す時に使用します。
@Override public String getName() { return "price-sum-task"; }
実行モード。このServer Taskが単一Nodeで実行されるのか、全Nodeで実行されるのかを定義します。このメソッドは
オーバーライド必須ではなく、デフォルトでは単一Nodeで実行されます。今回は明示しました。
@Override public TaskExecutionMode getExecutionMode() { return TaskExecutionMode.ONE_NODE; // default, ONE_NODE }
そして、最後はServer Taskの処理内容です。
@Override public Result call() throws Exception { Logger logger = Logger.getLogger(PriceSumTask.class); logger.infof("start task"); Map<String, ?> parameters = taskContext.getParameters().orElse(Collections.emptyMap()); int greaterThanPrice; if (parameters.containsKey("greaterThanPrice")) { greaterThanPrice = (Integer) parameters.get("greaterThanPrice"); // ALL_NODESの時は値はStringのみ } else { greaterThanPrice = 0; } Cache<String, Book> cache = (Cache<String, Book>) taskContext.getCache().orElseThrow(); logger.infof("execution mode = %s, cache size = %d, greaterThan = %d", getExecutionMode(), cache.size(), greaterThanPrice); Result result = Result.create( cache .values() .stream() .filter(book -> book.getPrice() > greaterThanPrice) .map(book -> { Logger l = Logger.getLogger(PriceSumTask.class); l.infof("map entry = %s", book.getIsbn()); return book.getPrice(); }) .collect(() -> Collectors.summingInt(price -> price)) ); logger.infof("end task, result = %d", result.getValue()); return result; }
Cacheに格納されているBook
クラスの価格を合算するのですが、パラメーターで合算する価格の下限を任意で指定できるように
しています。まあ、パラメーターを使う例として…。
あと、Server Task自身がSerializable
を要求されないように、ログ出力しつつもローカル変数で処理が完結するように
しています。
作成したクラスの名前は、META-INF/services/org.infinispan.tasks.ServerTask
ファイルに定義してService Loaderから
読み込まれるようにしておきます。
task/src/main/resources/META-INF/services/org.infinispan.tasks.ServerTask
org.littlewings.infinispan.task.PriceSumTask
こちらで自動生成してもよかった気はしますが、今回は明示的に作ることにしました。
ところで、Serializable
についてですが。Server TaskをSerializable
にした場合や、戻り値でJava標準のシリアライズを使う場合は
こちらに沿ってMarshallerをJavaSerializationMarshaller
に設定するとともに、デシリアライズ可能なクラスのリストに
追加する必要があります。
Allowing deserialization of Java classes
このあたりも、こちらで触れています。
Infinispan 12.1でのMarshalling/Encodingと分散処理と - CLOVER🍀
仮に設定したとすると、以下のようになります。
※Infinispan Server側の設定です
<cache-container name="default" statistics="true"> <transport cluster="${infinispan.cluster.name:cluster}" stack="${infinispan.cluster.stack:tcp}" node-name="${infinispan.node.name:}"/> <security> <authorization/> </security> <serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller"> <allow-list> <regex>org\.littlewings\.infinispan\.task\..+</regex> <!-- その他、必要に応じて追加 --> </allow-list> </serialization> </cache-container>
Infinispan ServerにServer Taskをデプロイする
ここまでで、Infinispan Serverにデプロイするクラスができあがったので、パッケージングしてデプロイしましょう。
こちらのドキュメントに沿って進めます。
Deploying Server Tasks to Infinispan Servers
まずは、パッケージング。
$ mvn -pl task -am packag
作成されたJARファイル。
$ ll entity/target/entity-0.0.1-SNAPSHOT.jar task/target/task-0.0.1-SNAPSHOT.jar -rw-rw-r-- 1 xxxxx xxxxx 9213 7月 10 21:32 entity/target/entity-0.0.1-SNAPSHOT.jar -rw-rw-r-- 1 xxxxx xxxxx 8528 7月 10 21:32 task/target/task-0.0.1-SNAPSHOT.jar
こちらを、Infinispan Serverのserver/lib
ディレクトリにコピーします。
$ cp /path/to/entity/target/entity-0.0.1-SNAPSHOT.jar /path/to/task/target/task-0.0.1-SNAPSHOT.jar server/lib
コピーしたら、Infinispan Serverを再起動してください。
Infinispan ServerがServer Taskを認識すると、以下のように「Extensionをロードした」というログが出力されます。
2021-07-10 12:47:27,529 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'org.littlewings.infinispan.task.PriceSumTask'
この操作を、クラスタを構成するInfinispan Server分だけ行います。今回は、3 Nodeですね。
これで、Server Taskのデプロイは完了です。
ちなみに、server/lib
ディレクトリにはREADME.md
ファイルがあり、カスタムJARファイルを配置するための
ディレクトリであることが書かれています。
server/lib/README.txt
Place any custom jars you wish to add to the server classpath in this directory.
clientモジュールでの、テストコードの雛形
最後は、デプロイしたServer Taskを動作確認を行うためのテストコードを書いていきます。
まずはpom.xml
から。
client/pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>remote-task-serverng-proto</artifactId> <groupId>org.littlewings</groupId> <version>0.0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>client</artifactId> <dependencies> <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-client-hotrod</artifactId> <version>12.1.6.Final</version> </dependency> <dependency> <groupId>org.littlewings</groupId> <artifactId>entity</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> <!-- for cache administration --> <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-core</artifactId> <version>12.1.6.Final</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.7.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <version>5.7.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.20.2</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.2</version> </plugin> </plugins> </build> </project>
Server Taskを実行するために必要なのはHot Rod Clientだけなので、infinispan-client-hotrod
と今回使用するentity
モジュールが
依存関係にあれば十分です。
なのですが、Cacheの定義はテストコード内で作成しようと思うので、infinispan-core
も追加しています。
あとはJUnit 5とAssertJをテスト用ライブラリとして指定。
テストコードの雛形は、こちらです。
client/src/test/java/org/littlewings/infinispan/client/TaskClientTest.java
package org.littlewings.infinispan.client; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.Consumer; import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.client.hotrod.RemoteCacheManager; import org.infinispan.client.hotrod.RemoteCacheManagerAdmin; import org.infinispan.client.hotrod.configuration.Configuration; import org.infinispan.client.hotrod.configuration.ConfigurationBuilder; import org.infinispan.client.hotrod.exceptions.HotRodClientException; import org.infinispan.commons.marshall.Marshaller; import org.infinispan.configuration.cache.CacheMode; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.littlewings.infinispan.entity.Book; import org.littlewings.infinispan.entity.Result; import static org.assertj.core.api.Assertions.*; public class TaskClientTest { List<Book> books = List.of( Book.create("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5344), Book.create("978-1849518222", "Infinispan Data Grid Platform", 3608), Book.create("978-0359439379", "The Apache Ignite Book", 7686), Book.create("978-1365732355", "High Performance in-memory computing with Apache Ignite", 6342), Book.create("978-1789347531", "Apache Ignite Quick Start Guide: Distributed data caching and processing made easy", 3638), Book.create("978-1785285332", "Getting Started with Hazelcast - Second Edition: Get acquainted with the highly scalable data grid, Hazelcast, and learn how to bring its powerful in-memory features into your application", 4209), Book.create("978-1617295522", "Spark in Action, Second Edition: Covers Apache Spark 3 with Examples in Java, Python, and Scala", 6297), Book.create("978-1484257807", "Beginning Apache Spark Using Azure Databricks: Unleashing Large Cluster Analytics in the Cloud", 4817), Book.create("978-1788997829", "Apache Kafka Quick Start Guide: Leverage Apache Kafka 2.0 to simplify real-time data processing for distributed applications", 3516), Book.create("978-1491936160", "Kafka: The Definitive Guide: Real-Time Data and Stream Processing at Scale", 4989) ); // ここに、Cacheのセットアップやテストコードを書く }
フィールドに定義しているのは、今回のテスト内で扱うデータですね。
では、ここからもうちょっと書いていきましょう。
Cacheを作成と認証設定
テストの最初に、Cacheを削除・作成するようにしましょう。以下のようなコードを、テストの実行前に1度実行します。
@BeforeAll static void createCache() { Configuration configuration = new ConfigurationBuilder() .uri("hotrod://ispn-admin:admin-password@172.17.0.2:11222,172.17.0.3:11222,172.17.0.4:11222") .build(); try (RemoteCacheManager manager = new RemoteCacheManager(configuration)) { RemoteCacheManagerAdmin admin = manager.administration(); admin.removeCache("bookCache"); org.infinispan.configuration.cache.Configuration cacheConfiguration = new org.infinispan.configuration.cache.ConfigurationBuilder() .clustering().cacheMode(CacheMode.DIST_SYNC) .security().authorization().enable() .encoding().key().mediaType("application/x-protostream") .encoding().value().mediaType("application/x-protostream") .build(); admin.getOrCreateCache("bookCache", cacheConfiguration); } }
RemoteCacheManagerAdmin
を使って、Cacheをプログラムで定義します。このために、infinispan-core
を依存関係に
追加しました。
Hot Rodで接続する際のURIにユーザーのIDとパスワードを指定していますが、これは次に作成します。
Infinispan 12.1.0.Finalからデフォルトで認証が必要になっているようなので、この挙動に従います。
※これまではデフォルトでは認証は無効でした
作成しているCacheは、以下の定義です。
- Distributed Cache
- 認可を有効
- エンコーディングはProtoStream
エンコーディングは今回必須ではないのですが、指定しておかないとInfinispan Serverから警告されるので、明示的しておくことに
します。
なお、Cacheは最初に削除して、それから再定義する形を取っています。
認証・認可設定
認証、認可が必要になるので、Infinispan Server側に設定を行います。
ここで紹介する操作は、クラスタを構成する各Infinispan Serverそれぞれに行います。
まずは、ユーザーを作成します。
$ bin/cli.sh user create ispn-admin -p admin-password -g admin
ユーザーは、admin
グループに所属するようにしました。
次に、認可設定を行います。
Server Taskを実行するためには、EXEC
権限が必要です。
認可設定は簡単のため、ClusterRoleMapper
を使用しましょう。
ClusterRoleMapper
はデフォルトの認可設定で、ユーザーが所属するグループ名に合わせて権限が決まります。
デフォルトの認可設定なので以下の状態ですでにClusterRoleMapper
が使われることになるのですが、
<cache-container name="default" statistics="true"> <transport cluster="${infinispan.cluster.name:cluster}" stack="${infinispan.cluster.stack:tcp}" node-name="${infinispan.node.name:}"/> <security> <authorization/> </security> </cache-container>
明示的に設定するなら、以下のようになります。
<cache-container name="default" statistics="true"> <transport cluster="${infinispan.cluster.name:cluster}" stack="${infinispan.cluster.stack:tcp}" node-name="${infinispan.node.name:}"/> <security> <authorization> <cluster-role-mapper/> </authorization> </security> </cache-container>
今回は、ALL
権限を持ったadmin
グループを使います。
実際のCacheの定義
これで、こちらのコードで定義したCacheを実際に作成すると
@BeforeAll static void createCache() { Configuration configuration = new ConfigurationBuilder() .uri("hotrod://ispn-admin:admin-password@172.17.0.2:11222,172.17.0.3:11222,172.17.0.4:11222") .build(); try (RemoteCacheManager manager = new RemoteCacheManager(configuration)) { RemoteCacheManagerAdmin admin = manager.administration(); admin.removeCache("bookCache"); org.infinispan.configuration.cache.Configuration cacheConfiguration = new org.infinispan.configuration.cache.ConfigurationBuilder() .clustering().cacheMode(CacheMode.DIST_SYNC) .security().authorization().enable() .encoding().key().mediaType("application/x-protostream") .encoding().value().mediaType("application/x-protostream") .build(); admin.getOrCreateCache("bookCache", cacheConfiguration); } }
このようになります。
server/data/caches.xml
<?xml version="1.0"?> <infinispan xmlns="urn:infinispan:config:12.1"> <cache-container> <distributed-cache mode="SYNC" remote-timeout="17500" name="bookCache" statistics="true"> <encoding> <key media-type="application/x-protostream"/> <value media-type="application/x-protostream"/> </encoding> <locking concurrency-level="1000" acquire-timeout="15000" striping="false"/> <security> <authorization enabled="true"/> </security> <state-transfer timeout="60000"/> </distributed-cache> </cache-container> </infinispan>
テストコード内でInfinispan Serverに接続するためのメソッド定義
テストコード内では、Infinispan Server内に接続する必要があります。この定義は、以下のようにまとめました。
<K, V> void withRemoteCache(String cacheName, Consumer<RemoteCache<K, V>> func) { Configuration configuration = new ConfigurationBuilder() .uri("hotrod://ispn-admin:admin-password@172.17.0.2:11222,172.17.0.3:11222,172.17.0.4:11222" + "?context-initializers=org.littlewings.infinispan.entity.EntitiesInitializerImpl") .build(); try (RemoteCacheManager manager = new RemoteCacheManager(configuration)) { RemoteCache<K, V> cache = manager.getCache(cacheName); func.accept(cache); } }
context-initializers
で、entity
モジュールで自動生成されたSerializationContextInitializer
インターフェースの実装クラスを
指定しているところがポイントですね。
Server Taskを実行する
それでは、Server Taskを呼び出してみます。
パラメーターなし、あり、それぞれで、以下のようなテストを作成しました。
@Test public void singleNodeTask() { this.<String, Book>withRemoteCache("bookCache", cache -> { books.forEach(b -> cache.put(b.getIsbn(), b)); Result result = cache.execute("price-sum-task", Collections.emptyMap()); assertThat(result.getValue()).isEqualTo(50446); cache.clear(); assertThat(cache).isEmpty(); }); } @Test public void singleNodeTaskWithParameters() { this.<String, Book>withRemoteCache("bookCache", cache -> { books.forEach(b -> cache.put(b.getIsbn(), b)); Map<String, Object> parameters = Map.of("greaterThanPrice", 5000); Result result = cache.execute("price-sum-task", parameters); assertThat(result.getValue()).isEqualTo(25669); cache.clear(); assertThat(cache).isEmpty(); }); }
Server Taskの呼び出し自体はとても簡単で、RemoteCache#execute
で呼び出すServer Task名と、Server Taskに必要な
パラメーターをMap
で指定します。
// パラメーターなし Result result = cache.execute("price-sum-task", Collections.emptyMap()); // パラメーターあり Map<String, Object> parameters = Map.of("greaterThanPrice", 5000); Result result = cache.execute("price-sum-task", parameters);
実行すると、こんな感じでInfinispan Server上で分散実行されていることが確認できます。
パラメーターなしの場合。
## Infinispan Server (172.17.0.4) 2021-07-10 14:06:09,434 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] set task context 2021-07-10 14:06:09,435 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] start task 2021-07-10 14:06:09,439 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] execution mode = ONE_NODE, cache size = 10, greaterThan = 0 2021-07-10 14:06:09,441 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1785285332 2021-07-10 14:06:09,441 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1789347531 2021-07-10 14:06:09,442 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1782169970 2021-07-10 14:06:09,443 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-0359439379 2021-07-10 14:06:09,443 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1484257807 2021-07-10 14:06:09,443 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1491936160 2021-07-10 14:06:09,443 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1365732355 2021-07-10 14:06:09,444 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1849518222 2021-07-10 14:06:09,444 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] end task, result = 50446 ## Infinspan Server(172.17.0.3) 2021-07-10 14:06:09,442 INFO (jgroups-22,129663e80859-52991) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1617295522 2021-07-10 14:06:09,442 INFO (jgroups-22,129663e80859-52991) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1788997829
パラメーターありの場合。
## Infinispan Server(172.17.0.2) 2021-07-10 14:08:58,613 INFO (SINGLE_PORT-ServerIO-3-1) [org.littlewings.infinispan.task.PriceSumTask] set task context 2021-07-10 14:08:58,614 INFO (SINGLE_PORT-ServerIO-3-1) [org.littlewings.infinispan.task.PriceSumTask] start task 2021-07-10 14:08:58,615 INFO (SINGLE_PORT-ServerIO-3-1) [org.littlewings.infinispan.task.PriceSumTask] execution mode = ONE_NODE, cache size = 10, greaterThan = 5000 2021-07-10 14:08:58,616 INFO (SINGLE_PORT-ServerIO-3-1) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1617295522 2021-07-10 14:08:58,617 INFO (SINGLE_PORT-ServerIO-3-1) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1782169970 2021-07-10 14:08:58,617 INFO (SINGLE_PORT-ServerIO-3-1) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-0359439379 2021-07-10 14:08:58,618 INFO (SINGLE_PORT-ServerIO-3-1) [org.littlewings.infinispan.task.PriceSumTask] end task, result = 25669 ## Infinispan Server(172.17.0.3) 2021-07-10 14:08:58,617 INFO (jgroups-25,129663e80859-52991) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1365732355
Server Taskの実行モードこそONE_NODE
(単一Node)ですが、Server Taskから起動される処理内で分散処理などを行い、
クラスタ全体に跨る操作をすることは可能です。
これで、デプロイしたServer Taskが動作していることが確認できました、と
オマケ
ここでは、いくつか内部を追ってみたことを書こうかなと思います。
Server Taskのインスタンスが生成されるタイミングと個数は?
Server Taskは、各Infinispan Serverで単一のインスタンスになります。
Infinispan Serverの起動時にExtentionとしてService Loaderの仕組みでロードされ、インスタンス化されます。
ところで、単一のインスタンスがゆえに、TaskContext
をsetterで受け取ると並行性が気になるところですが、
これはServer Task側ではどう対策するのがいいんでしょうね?ThreadLocal
でしょうか?
あるServer Taskの実行は、リクエストがキューで管理されていたり、同期化されていたりするわけではないので…。
Server Taskの実行の様子は?
Server Taskの実行は、Infinispan Serverでリクエストを受けるとまずはTaskRequestProcessor
によってハンドリングされます。
そして、ServerTaskEngine
によって実行モードが単一Nodeか全NodeかでServerTaskRunner
が決められ、実際のServer Taskを
起動します。
単一Nodeの場合のServerTaskRunner
はこちら。
実際のServer TaskのインスタンスをラップしたServerTaskWrapper
越しに、Server Taskを呼び出すだけです。
ここまででは動作させていませんが、全Nodeの場合はこちらのクラスが使われます。
※なぜ動作させなかったのは、後述します
そして、Cluster Executorの仕組みで全Node上でServer Taskを実行します。
まとめ?
ServerNGとなったInfinispan Serverで、ProtoStreamでMarshallingしつつServer Taskを使ってみました。
動かすまで一苦労だったのですが、Marshalling、認証・認可などいろいろ勉強になりました…。
この後に書いている「ハマったこと?」で記載しているソースコードも含めて、以下に置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-task-serverng-proto
ハマったこと?
ここまでの動作確認と、結論を出すにはいろいろとハマったことがありまして。
そちらを紹介したいと思います。
ADMIN権限を持っていないと、Server Taskを実行できない
ところでClusterRoleMapper
の説明を見ていると、EXEC
権限を含むapplication
グループにユーザーを作れば良さそうですが、
どうやらそれでは権限が足りないようで…。
認可設定が有効な場合、Server Taskの実行時に権限の有無を確認するのですが
この時に、権限確認の操作を行うためにADMIN
権限が必要になっていて、事実上今はADMIN
権限がないとServer Taskを
実行できなさそうです。
認可を有効にすると、CacheはSecureCacheImpl
でラップされるようです。
このため、今はこうなります、と。
仮に、application
グループのユーザーでServer Taskを実行した場合は、以下のように例外がスローされます。
7月 10, 2021 10:53:59 午後 org.infinispan.client.hotrod.impl.protocol.Codec20 checkForErrorsInResponseStatus WARN: ISPN004005: Error received from the server: java.lang.SecurityException: ISPN000287: Unauthorized access: subject 'Subject with principal(s): [ispn-app, RolePrincipal{name='application'}, InetAddressPrincipal [address=172.17.0.1/172.17.0.1]]' lacks 'ADMIN' permission org.infinispan.client.hotrod.exceptions.HotRodClientException:Request for messageId=61 returned server error (status=0x85): java.lang.SecurityException: ISPN000287: Unauthorized access: subject 'Subject with principal(s): [ispn-app, RolePrincipal{name='application'}, InetAddressPrincipal [address=172.17.0.1/172.17.0.1]]' lacks 'ADMIN' permission
そのため、今回は全権限(ALL
)を持ったadmin
グループのユーザーを使いました。
実行モードを全Nodeにできない
今回、Server Taskの実行モードを単一Node(ONE_NODE
)としましたが、全Node(ALL_NODES
)とすることが
できませんでした。
これは、Marshallingの仕組みをProtoStreamにしている場合、です。
理由はこちらで、実行モードをALL_NODES
とすると、Hot Rod Clientには結果がList
(各Nodeでの実行結果が含まれる)と
なるのですが、これがCollections#synchronizedList
で作成されます。
ProtoStreamには、JDKで用意されているクラスのいくつかにはデフォルトでMarshallerを用意しています。
ですが、この中にはCollections#synchronizedList
に対応するものがないため、Server Taskの実行完了時にMarshallingできずに
失敗します。
まあ、全体を跨る処理は単一Nodeでの実行で分散処理をすることになる気がするので、そんなに困らない気はするの
ですが…どうでしょう?
ProtoStreamでCollections#synchronizedList
をMarshallingできないことは、テストコードで簡単に確認できます。
以下が、Collections#synchronizedList
とArrayList
をそれぞれProtoStreamでMarshallingするテストコードです。
@Test public void failMarshallingSynchronizedList() { this.<String, Book>withRemoteCache("bookCache", cache -> { Marshaller marshaller = cache.getRemoteCacheManager().getMarshaller(); List<String> list = Collections.synchronizedList(new ArrayList<>()); list.add("Hello"); list.add("World"); assertThatThrownBy(() -> marshaller.objectToByteBuffer(list)) .hasMessageContaining("No marshaller registered for object of Java type java.util.Collections$SynchronizedRandomAccessList") .isInstanceOf(IllegalArgumentException.class); }); } @Test public void marshallingArrayList() { this.<String, Book>withRemoteCache("bookCache", cache -> { Marshaller marshaller = cache.getRemoteCacheManager().getMarshaller(); List<String> list = new ArrayList<>(); list.add("Hello"); list.add("World"); try { byte[] binary = marshaller.objectToByteBuffer(list); assertThat(binary).hasSize(67); assertThat((List<String>) marshaller.objectFromByteBuffer(binary)) .isEqualTo(list) .isInstanceOf(ArrayList.class) .containsExactly("Hello", "World"); } catch (IOException | InterruptedException | ClassNotFoundException e) { fail("fail", e); } }); }
ArrayList
の方はうまくいきますが、Collections#synchronizedList
の方は失敗します。
これが、Infinispan ServerにデプロイしたServer Taskの実行時にも発生します。
なお、全Node(ALL_NODES
)の実行モードで作成したServer Taskはデプロイ自体は可能ですし、Marshallingの仕組みを
Java標準のシリアライズにすれば実行可能になります。
おそらく、Cacheに保存するデータはProtoStreamでエンコーディングし、Server Taskの実行時にHot Rod Clientと
Infinispan Serverの間のやり取りをJava標準のシリアライズの仕組みとすれば、データの保存はProtoStreamのままでも
良いかもしれません。が、そこまでは試していません。
ちなみに、ServerNGとなる前まではCollections#synchronizedList
ではなくArrayList
だったようです。
実行モードを全Nodeにすると、パラメーターがStringに強制変換される
それでもと、全Node(ALL_NODES
)で動かしてみたのですが、ちょっと驚くことにこの場合はパラメーターが
String
に強制変換されます。
以下が、単一Node向けに作成したServer Taskの、全Node(ALL_NODES
)版です。
task/src/main/java/org/littlewings/infinispan/task/PriceSumAllNodesTask.java
package org.littlewings.infinispan.task; import java.util.Collections; import java.util.Map; import java.util.stream.Collectors; import org.infinispan.Cache; import org.infinispan.tasks.ServerTask; import org.infinispan.tasks.TaskContext; import org.infinispan.tasks.TaskExecutionMode; import org.jboss.logging.Logger; import org.littlewings.infinispan.entity.Book; import org.littlewings.infinispan.entity.Result; public class PriceSumAllNodesTask implements ServerTask<Result> { TaskContext taskContext; @Override public void setTaskContext(TaskContext taskContext) { Logger logger = Logger.getLogger(PriceSumAllNodesTask.class); logger.infof("set task context"); this.taskContext = taskContext; } @Override public Result call() throws Exception { Logger logger = Logger.getLogger(PriceSumAllNodesTask.class); logger.infof("start task"); Map<String, ?> parameters = taskContext.getParameters().orElse(Collections.emptyMap()); int greaterThanPrice; if (parameters.containsKey("greaterThanPrice")) { greaterThanPrice = Integer.parseInt((String) parameters.get("greaterThanPrice")); // ALL_NODESの時は値はStringのみ } else { greaterThanPrice = 0; } Cache<String, Book> cache = (Cache<String, Book>) taskContext.getCache().orElseThrow(); logger.infof("execution mode = %s, cache size = %d, greaterThan = %d", getExecutionMode(), cache.size(), greaterThanPrice); Result result = Result.create( cache .values() .stream() .filter(book -> book.getPrice() > greaterThanPrice) .map(book -> { Logger l = Logger.getLogger(PriceSumAllNodesTask.class); l.infof("map entry = %s", book.getIsbn()); return book.getPrice(); }) .collect(() -> Collectors.summingInt(price -> price)) ); logger.infof("end task, result = %d", result.getValue()); return result; } @Override public String getName() { return "price-sum-all-nodes-task"; } @Override public TaskExecutionMode getExecutionMode() { return TaskExecutionMode.ALL_NODES; // default, ONE_NODE } }
Service Loaderの仕組みでロードするファイルは、こうなります。
task/src/main/resources/META-INF/services/org.infinispan.tasks.ServerTask
org.littlewings.infinispan.task.PriceSumTask org.littlewings.infinispan.task.PriceSumAllNodesTask
実行モードは当然違うのですが
@Override public TaskExecutionMode getExecutionMode() { return TaskExecutionMode.ALL_NODES; // default, ONE_NODE }
その他に変わった差異として、パラメーターがString
になっています。
if (parameters.containsKey("greaterThanPrice")) { greaterThanPrice = Integer.parseInt((String) parameters.get("greaterThanPrice")); // ALL_NODESの時は値はStringのみ } else { greaterThanPrice = 0; }
これはどうしたことだろう?と思ったのですが、意図的に(?)Object#toString
していますね。
これは、パラメーターをProtoStreamでMarshallingする関係上、型を決めないといけなかったのかな?と思うのですが、
どうでしょうか。
なお、単一Nodeの場合は、このようなことは発生しません。
Collections#synchronizedList
がMarshallingできないことも含めて、全Nodeでの実行モードで動かした時のテストコードはこちらです。
@Test public void allNodesTask() { this.<String, Book>withRemoteCache("bookCache", cache -> { books.forEach(b -> cache.put(b.getIsbn(), b)); assertThatThrownBy(() -> cache.execute("price-sum-all-nodes-task", Collections.emptyMap())) .isInstanceOf(HotRodClientException.class) .hasMessage("org.infinispan.commons.marshall.MarshallingException: ISPN000615: Unable to unmarshall 'java.util.Collections$SynchronizedRandomAccessList' as a marshaller is not present in the user or global SerializationContext"); cache.clear(); assertThat(cache).isEmpty(); }); } @Test public void allNodesTaskWithParameters() { this.<String, Book>withRemoteCache("bookCache", cache -> { books.forEach(b -> cache.put(b.getIsbn(), b)); Map<String, Object> parameters = Map.of("greaterThanPrice", 5000); assertThatThrownBy(() -> cache.execute("price-sum-all-nodes-task", parameters)) .isInstanceOf(HotRodClientException.class) .hasMessage("org.infinispan.commons.marshall.MarshallingException: ISPN000615: Unable to unmarshall 'java.util.Collections$SynchronizedRandomAccessList' as a marshaller is not present in the user or global SerializationContext"); cache.clear(); assertThat(cache).isEmpty(); }); }
ハマったことと、その記録でした…。