これは、なにをしたくて書いたもの?
Infinispan ServerがServerNGと呼ばれる形態になってから、Server Taskを動かしたことがないなぁと思いまして。
MarshallingもデフォルトがJBoss MarshallingからProtoStreamになったことですし、このあたりをまとめて試してみようと
思います。
Server Taskを実行するなら、分散Streamかなとも思ったのですが、まずは分散処理とProtoStreamの組み合わせを確認した方が
いいかなと思ったので、それについては先にやっておきました。
Infinispan 12.1でのMarshalling/Encodingと分散処理と - CLOVER🍀
今回はServer Taskに絞って扱いたいと思います。
なお、ServerNGになる前に、Server Taskを動かした時のエントリはこちら。
InfinispanのRemote Task Executionを使って、Infinispan Server上でタスクを実行する - CLOVER🍀
Server Task
Infninspan Serverにおける、Server Taskについてのドキュメントはこちら。
Remotely Executing Server-Side Tasks
Infinispan CLI、REST API、Hot Rod Clientから呼び出せるTaskを定義して、Infinispan Serverにデプロイできます。
Taskの特徴は、以下のようなところでしょうか。
- Taskは、Javaで作成するか、JavaScriptなどのスクリプト言語で作成する
- Taskを実行するのが、単一のNode(リクエストを受けたNode)か全Nodeかを選択できる
- このモードを決めるのは、デプロイされるTask自身となる
- Taskは、EmbeddedなInfinispanの機能を使って作成できる
- Taskは、呼び出し元からパラメーターを受け取れる
- デプロイ方法は、以下の2つ
今回は、Javaで作成するTaskを扱います。
JavaでのServer Taskの作り方は、
ServerTaskインターフェースを実装したクラスを作成して
そのクラス名を書いたMETA-INF/services/org.infinispan.tasks.ServerTaskファイルを含めたJARファイルを作成して、
Infinispan Serverのserver/libディレクトリにコピーします。
Deploying Server Tasks to Infinispan Servers
この時、Infinispan Serverを再起動する必要があります。
では、この流れでServer Taskを作成してデプロイしていきましょう。
環境
今回の環境は、こちらです。
$ java --version openjdk 11.0.11 2021-04-20 OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04) OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing) $ mvn --version Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 11.0.11, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64 Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-77-generic", arch: "amd64", family: "unix"
Infinispanは12.1.6.Finalを使い、3 Node用意します。各Nodeは、171.17.0.2〜4で動作しているものとします。
お題
Taskは、こんなお題で作成しようと思います。
- Cacheに格納するのは自作のクラスとし、Server Taskの戻り値も自作のクラスとする
- MarshallingにはProtoStreamを使用する
- 書籍の合計金額を計算するServer Taskを作成する
- パラメーターも扱う
なお、周辺テーマ、ハマって切り落としたテーマはこのあたりです。
- 認証、認可設定
- ProtoStreamと実行モードの相性
では、進めていきましょう。
プロジェクト作成
今回は、Mavenのマルチプロジェクト構成で作成します。以下の3つのモジュールを持つ構成としましょう。
それぞれ、こんな内容にします。
- entity … Cacheに格納するクラスや、Server Taskの戻り値に使うクラス、ProtoStreamを使ったMarshalling設定
- task … Server Task
- client … Server Taskを実行するテストコード
以降、それぞれ順次載せていきます。
親モジュール
全体の親モジュールの設定は、こちら。といっても、pom.xmlだけですが。
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.littlewings</groupId> <artifactId>remote-task-serverng-proto</artifactId> <packaging>pom</packaging> <version>0.0.1-SNAPSHOT</version> <modules> <module>entity</module> <module>task</module> <module>client</module> </modules> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> </project>
entityモジュール
entityモジュールのpom.xmlは、こちら。
entity/pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>remote-task-serverng-proto</artifactId> <groupId>org.littlewings</groupId> <version>0.0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>entity</artifactId> <dependencies> <dependency> <groupId>org.infinispan.protostream</groupId> <artifactId>protostream-processor</artifactId> <version>4.4.1.Final</version> <optional>true</optional> </dependency> </dependencies> </project>
依存関係は、protostream-processorだけです。
書籍クラス。こちらは、Cacheに格納します。
entity/src/main/java/org/littlewings/infinispan/entity/Book.java
package org.littlewings.infinispan.entity; import org.infinispan.protostream.annotations.ProtoFactory; import org.infinispan.protostream.annotations.ProtoField; public class Book { @ProtoField(number = 1) String isbn; @ProtoField(number = 2) String title; @ProtoField(number = 3, defaultValue = "0") int price; @ProtoFactory public static Book create(String isbn, String title, int price) { Book book = new Book(); book.setIsbn(isbn); book.setTitle(title); book.setPrice(price); return book; } // getter/setterは省略 }
続いて、Server Taskの戻り値に使う結果クラス。こちらも、ProtoStreamでMarshallingする設定にしています。
entity/src/main/java/org/littlewings/infinispan/entity/Result.java
package org.littlewings.infinispan.entity; import org.infinispan.protostream.annotations.ProtoFactory; import org.infinispan.protostream.annotations.ProtoField; public class Result { @ProtoField(number = 1, defaultValue = "0") int value; @ProtoFactory public static Result create(int value) { Result result = new Result(); result.setValue(value); return result; } // getter/setterは省略 }
この2つのクラスのMarshallingの定義を生成するための、SerializationContextInitializerの定義。
entity/src/main/java/org/littlewings/infinispan/entity/EntitiesInitializer.java
package org.littlewings.infinispan.entity; import org.infinispan.protostream.SerializationContextInitializer; import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder; @AutoProtoSchemaBuilder( includeClasses = {Book.class, Result.class}, schemaFileName = "entities.proto", schemaFilePath = "proto", schemaPackageName = "org.littlewings.infinispan.entity" ) public interface EntitiesInitializer extends SerializationContextInitializer { }
これで、Cacheに格納するクラスとServer Taskの戻り値に使うクラスの定義は終わりです。
taskモジュール
続いて、taskモジュール。こちらのドキュメントに沿って作成します。
pom.xmlは、こんな感じです。
task/pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>remote-task-serverng-proto</artifactId> <groupId>org.littlewings</groupId> <version>0.0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>task</artifactId> <dependencies> <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-tasks-api</artifactId> <version>12.1.6.Final</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.littlewings</groupId> <artifactId>entity</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> </dependencies> </project>
依存関係はinfinispan-tasks-apiと、先ほど作成したentityモジュールを追加しています。
そして、Server Taskの定義。
task/src/main/java/org/littlewings/infinispan/task/PriceSumTask.java
package org.littlewings.infinispan.task; import java.util.Collections; import java.util.Map; import java.util.stream.Collectors; import org.infinispan.Cache; import org.infinispan.tasks.ServerTask; import org.infinispan.tasks.TaskContext; import org.infinispan.tasks.TaskExecutionMode; import org.jboss.logging.Logger; import org.littlewings.infinispan.entity.Book; import org.littlewings.infinispan.entity.Result; public class PriceSumTask implements ServerTask<Result> { TaskContext taskContext; @Override public void setTaskContext(TaskContext taskContext) { Logger logger = Logger.getLogger(PriceSumTask.class); logger.infof("set task context"); this.taskContext = taskContext; } @Override public Result call() throws Exception { Logger logger = Logger.getLogger(PriceSumTask.class); logger.infof("start task"); Map<String, ?> parameters = taskContext.getParameters().orElse(Collections.emptyMap()); int greaterThanPrice; if (parameters.containsKey("greaterThanPrice")) { greaterThanPrice = (Integer) parameters.get("greaterThanPrice"); // ALL_NODESの時は値はStringのみ } else { greaterThanPrice = 0; } Cache<String, Book> cache = (Cache<String, Book>) taskContext.getCache().orElseThrow(); logger.infof("execution mode = %s, cache size = %d, greaterThan = %d", getExecutionMode(), cache.size(), greaterThanPrice); Result result = Result.create( cache .values() .stream() .filter(book -> book.getPrice() > greaterThanPrice) .map(book -> { Logger l = Logger.getLogger(PriceSumTask.class); l.infof("map entry = %s", book.getIsbn()); return book.getPrice(); }) .collect(() -> Collectors.summingInt(price -> price)) ); logger.infof("end task, result = %d", result.getValue()); return result; } @Override public String getName() { return "price-sum-task"; } @Override public TaskExecutionMode getExecutionMode() { return TaskExecutionMode.ONE_NODE; // default, ONE_NODE } }
ServerTaskインターフェースを実装し、戻り値はentityモジュールで作成したResultクラスとします。
public class PriceSumTask implements ServerTask<Result> {
setTaskContextメソッドでは、TaskContextを受け取ります。このTaskContextから、Cacheやパラメーターを取得します。
@Override public void setTaskContext(TaskContext taskContext) { Logger logger = Logger.getLogger(PriceSumTask.class); logger.infof("set task context"); this.taskContext = taskContext; }
Server Taskには名前を指定する必要があり、これはgetNameメソッドで定義します。この名前は、Client側からServer Taskを
呼び出す時に使用します。
@Override public String getName() { return "price-sum-task"; }
実行モード。このServer Taskが単一Nodeで実行されるのか、全Nodeで実行されるのかを定義します。このメソッドは
オーバーライド必須ではなく、デフォルトでは単一Nodeで実行されます。今回は明示しました。
@Override public TaskExecutionMode getExecutionMode() { return TaskExecutionMode.ONE_NODE; // default, ONE_NODE }
そして、最後はServer Taskの処理内容です。
@Override public Result call() throws Exception { Logger logger = Logger.getLogger(PriceSumTask.class); logger.infof("start task"); Map<String, ?> parameters = taskContext.getParameters().orElse(Collections.emptyMap()); int greaterThanPrice; if (parameters.containsKey("greaterThanPrice")) { greaterThanPrice = (Integer) parameters.get("greaterThanPrice"); // ALL_NODESの時は値はStringのみ } else { greaterThanPrice = 0; } Cache<String, Book> cache = (Cache<String, Book>) taskContext.getCache().orElseThrow(); logger.infof("execution mode = %s, cache size = %d, greaterThan = %d", getExecutionMode(), cache.size(), greaterThanPrice); Result result = Result.create( cache .values() .stream() .filter(book -> book.getPrice() > greaterThanPrice) .map(book -> { Logger l = Logger.getLogger(PriceSumTask.class); l.infof("map entry = %s", book.getIsbn()); return book.getPrice(); }) .collect(() -> Collectors.summingInt(price -> price)) ); logger.infof("end task, result = %d", result.getValue()); return result; }
Cacheに格納されているBookクラスの価格を合算するのですが、パラメーターで合算する価格の下限を任意で指定できるように
しています。まあ、パラメーターを使う例として…。
あと、Server Task自身がSerializableを要求されないように、ログ出力しつつもローカル変数で処理が完結するように
しています。
作成したクラスの名前は、META-INF/services/org.infinispan.tasks.ServerTaskファイルに定義してService Loaderから
読み込まれるようにしておきます。
task/src/main/resources/META-INF/services/org.infinispan.tasks.ServerTask
org.littlewings.infinispan.task.PriceSumTask
こちらで自動生成してもよかった気はしますが、今回は明示的に作ることにしました。
ところで、Serializableについてですが。Server TaskをSerializableにした場合や、戻り値でJava標準のシリアライズを使う場合は
こちらに沿ってMarshallerをJavaSerializationMarshallerに設定するとともに、デシリアライズ可能なクラスのリストに
追加する必要があります。
Allowing deserialization of Java classes
このあたりも、こちらで触れています。
Infinispan 12.1でのMarshalling/Encodingと分散処理と - CLOVER🍀
仮に設定したとすると、以下のようになります。
※Infinispan Server側の設定です
<cache-container name="default" statistics="true"> <transport cluster="${infinispan.cluster.name:cluster}" stack="${infinispan.cluster.stack:tcp}" node-name="${infinispan.node.name:}"/> <security> <authorization/> </security> <serialization marshaller="org.infinispan.commons.marshall.JavaSerializationMarshaller"> <allow-list> <regex>org\.littlewings\.infinispan\.task\..+</regex> <!-- その他、必要に応じて追加 --> </allow-list> </serialization> </cache-container>
Infinispan ServerにServer Taskをデプロイする
ここまでで、Infinispan Serverにデプロイするクラスができあがったので、パッケージングしてデプロイしましょう。
こちらのドキュメントに沿って進めます。
Deploying Server Tasks to Infinispan Servers
まずは、パッケージング。
$ mvn -pl task -am packag
作成されたJARファイル。
$ ll entity/target/entity-0.0.1-SNAPSHOT.jar task/target/task-0.0.1-SNAPSHOT.jar -rw-rw-r-- 1 xxxxx xxxxx 9213 7月 10 21:32 entity/target/entity-0.0.1-SNAPSHOT.jar -rw-rw-r-- 1 xxxxx xxxxx 8528 7月 10 21:32 task/target/task-0.0.1-SNAPSHOT.jar
こちらを、Infinispan Serverのserver/libディレクトリにコピーします。
$ cp /path/to/entity/target/entity-0.0.1-SNAPSHOT.jar /path/to/task/target/task-0.0.1-SNAPSHOT.jar server/lib
コピーしたら、Infinispan Serverを再起動してください。
Infinispan ServerがServer Taskを認識すると、以下のように「Extensionをロードした」というログが出力されます。
2021-07-10 12:47:27,529 INFO (main) [org.infinispan.SERVER] ISPN080027: Loaded extension 'org.littlewings.infinispan.task.PriceSumTask'
この操作を、クラスタを構成するInfinispan Server分だけ行います。今回は、3 Nodeですね。
これで、Server Taskのデプロイは完了です。
ちなみに、server/libディレクトリにはREADME.mdファイルがあり、カスタムJARファイルを配置するための
ディレクトリであることが書かれています。
server/lib/README.txt
Place any custom jars you wish to add to the server classpath in this directory.
clientモジュールでの、テストコードの雛形
最後は、デプロイしたServer Taskを動作確認を行うためのテストコードを書いていきます。
まずはpom.xmlから。
client/pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>remote-task-serverng-proto</artifactId> <groupId>org.littlewings</groupId> <version>0.0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>client</artifactId> <dependencies> <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-client-hotrod</artifactId> <version>12.1.6.Final</version> </dependency> <dependency> <groupId>org.littlewings</groupId> <artifactId>entity</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> <!-- for cache administration --> <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-core</artifactId> <version>12.1.6.Final</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.7.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <version>5.7.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.20.2</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.2</version> </plugin> </plugins> </build> </project>
Server Taskを実行するために必要なのはHot Rod Clientだけなので、infinispan-client-hotrodと今回使用するentityモジュールが
依存関係にあれば十分です。
なのですが、Cacheの定義はテストコード内で作成しようと思うので、infinispan-coreも追加しています。
あとはJUnit 5とAssertJをテスト用ライブラリとして指定。
テストコードの雛形は、こちらです。
client/src/test/java/org/littlewings/infinispan/client/TaskClientTest.java
package org.littlewings.infinispan.client; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.Consumer; import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.client.hotrod.RemoteCacheManager; import org.infinispan.client.hotrod.RemoteCacheManagerAdmin; import org.infinispan.client.hotrod.configuration.Configuration; import org.infinispan.client.hotrod.configuration.ConfigurationBuilder; import org.infinispan.client.hotrod.exceptions.HotRodClientException; import org.infinispan.commons.marshall.Marshaller; import org.infinispan.configuration.cache.CacheMode; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.littlewings.infinispan.entity.Book; import org.littlewings.infinispan.entity.Result; import static org.assertj.core.api.Assertions.*; public class TaskClientTest { List<Book> books = List.of( Book.create("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5344), Book.create("978-1849518222", "Infinispan Data Grid Platform", 3608), Book.create("978-0359439379", "The Apache Ignite Book", 7686), Book.create("978-1365732355", "High Performance in-memory computing with Apache Ignite", 6342), Book.create("978-1789347531", "Apache Ignite Quick Start Guide: Distributed data caching and processing made easy", 3638), Book.create("978-1785285332", "Getting Started with Hazelcast - Second Edition: Get acquainted with the highly scalable data grid, Hazelcast, and learn how to bring its powerful in-memory features into your application", 4209), Book.create("978-1617295522", "Spark in Action, Second Edition: Covers Apache Spark 3 with Examples in Java, Python, and Scala", 6297), Book.create("978-1484257807", "Beginning Apache Spark Using Azure Databricks: Unleashing Large Cluster Analytics in the Cloud", 4817), Book.create("978-1788997829", "Apache Kafka Quick Start Guide: Leverage Apache Kafka 2.0 to simplify real-time data processing for distributed applications", 3516), Book.create("978-1491936160", "Kafka: The Definitive Guide: Real-Time Data and Stream Processing at Scale", 4989) ); // ここに、Cacheのセットアップやテストコードを書く }
フィールドに定義しているのは、今回のテスト内で扱うデータですね。
では、ここからもうちょっと書いていきましょう。
Cacheを作成と認証設定
テストの最初に、Cacheを削除・作成するようにしましょう。以下のようなコードを、テストの実行前に1度実行します。
@BeforeAll static void createCache() { Configuration configuration = new ConfigurationBuilder() .uri("hotrod://ispn-admin:admin-password@172.17.0.2:11222,172.17.0.3:11222,172.17.0.4:11222") .build(); try (RemoteCacheManager manager = new RemoteCacheManager(configuration)) { RemoteCacheManagerAdmin admin = manager.administration(); admin.removeCache("bookCache"); org.infinispan.configuration.cache.Configuration cacheConfiguration = new org.infinispan.configuration.cache.ConfigurationBuilder() .clustering().cacheMode(CacheMode.DIST_SYNC) .security().authorization().enable() .encoding().key().mediaType("application/x-protostream") .encoding().value().mediaType("application/x-protostream") .build(); admin.getOrCreateCache("bookCache", cacheConfiguration); } }
RemoteCacheManagerAdminを使って、Cacheをプログラムで定義します。このために、infinispan-coreを依存関係に
追加しました。
Hot Rodで接続する際のURIにユーザーのIDとパスワードを指定していますが、これは次に作成します。
Infinispan 12.1.0.Finalからデフォルトで認証が必要になっているようなので、この挙動に従います。
※これまではデフォルトでは認証は無効でした
作成しているCacheは、以下の定義です。
- Distributed Cache
- 認可を有効
- エンコーディングはProtoStream
エンコーディングは今回必須ではないのですが、指定しておかないとInfinispan Serverから警告されるので、明示的しておくことに
します。
なお、Cacheは最初に削除して、それから再定義する形を取っています。
認証・認可設定
認証、認可が必要になるので、Infinispan Server側に設定を行います。
ここで紹介する操作は、クラスタを構成する各Infinispan Serverそれぞれに行います。
まずは、ユーザーを作成します。
$ bin/cli.sh user create ispn-admin -p admin-password -g admin
ユーザーは、adminグループに所属するようにしました。
次に、認可設定を行います。
Server Taskを実行するためには、EXEC権限が必要です。
認可設定は簡単のため、ClusterRoleMapperを使用しましょう。
ClusterRoleMapperはデフォルトの認可設定で、ユーザーが所属するグループ名に合わせて権限が決まります。
デフォルトの認可設定なので以下の状態ですでにClusterRoleMapperが使われることになるのですが、
<cache-container name="default" statistics="true"> <transport cluster="${infinispan.cluster.name:cluster}" stack="${infinispan.cluster.stack:tcp}" node-name="${infinispan.node.name:}"/> <security> <authorization/> </security> </cache-container>
明示的に設定するなら、以下のようになります。
<cache-container name="default" statistics="true"> <transport cluster="${infinispan.cluster.name:cluster}" stack="${infinispan.cluster.stack:tcp}" node-name="${infinispan.node.name:}"/> <security> <authorization> <cluster-role-mapper/> </authorization> </security> </cache-container>
今回は、ALL権限を持ったadminグループを使います。
実際のCacheの定義
これで、こちらのコードで定義したCacheを実際に作成すると
@BeforeAll static void createCache() { Configuration configuration = new ConfigurationBuilder() .uri("hotrod://ispn-admin:admin-password@172.17.0.2:11222,172.17.0.3:11222,172.17.0.4:11222") .build(); try (RemoteCacheManager manager = new RemoteCacheManager(configuration)) { RemoteCacheManagerAdmin admin = manager.administration(); admin.removeCache("bookCache"); org.infinispan.configuration.cache.Configuration cacheConfiguration = new org.infinispan.configuration.cache.ConfigurationBuilder() .clustering().cacheMode(CacheMode.DIST_SYNC) .security().authorization().enable() .encoding().key().mediaType("application/x-protostream") .encoding().value().mediaType("application/x-protostream") .build(); admin.getOrCreateCache("bookCache", cacheConfiguration); } }
このようになります。
server/data/caches.xml
<?xml version="1.0"?> <infinispan xmlns="urn:infinispan:config:12.1"> <cache-container> <distributed-cache mode="SYNC" remote-timeout="17500" name="bookCache" statistics="true"> <encoding> <key media-type="application/x-protostream"/> <value media-type="application/x-protostream"/> </encoding> <locking concurrency-level="1000" acquire-timeout="15000" striping="false"/> <security> <authorization enabled="true"/> </security> <state-transfer timeout="60000"/> </distributed-cache> </cache-container> </infinispan>
テストコード内でInfinispan Serverに接続するためのメソッド定義
テストコード内では、Infinispan Server内に接続する必要があります。この定義は、以下のようにまとめました。
<K, V> void withRemoteCache(String cacheName, Consumer<RemoteCache<K, V>> func) { Configuration configuration = new ConfigurationBuilder() .uri("hotrod://ispn-admin:admin-password@172.17.0.2:11222,172.17.0.3:11222,172.17.0.4:11222" + "?context-initializers=org.littlewings.infinispan.entity.EntitiesInitializerImpl") .build(); try (RemoteCacheManager manager = new RemoteCacheManager(configuration)) { RemoteCache<K, V> cache = manager.getCache(cacheName); func.accept(cache); } }
context-initializersで、entityモジュールで自動生成されたSerializationContextInitializerインターフェースの実装クラスを
指定しているところがポイントですね。
Server Taskを実行する
それでは、Server Taskを呼び出してみます。
パラメーターなし、あり、それぞれで、以下のようなテストを作成しました。
@Test public void singleNodeTask() { this.<String, Book>withRemoteCache("bookCache", cache -> { books.forEach(b -> cache.put(b.getIsbn(), b)); Result result = cache.execute("price-sum-task", Collections.emptyMap()); assertThat(result.getValue()).isEqualTo(50446); cache.clear(); assertThat(cache).isEmpty(); }); } @Test public void singleNodeTaskWithParameters() { this.<String, Book>withRemoteCache("bookCache", cache -> { books.forEach(b -> cache.put(b.getIsbn(), b)); Map<String, Object> parameters = Map.of("greaterThanPrice", 5000); Result result = cache.execute("price-sum-task", parameters); assertThat(result.getValue()).isEqualTo(25669); cache.clear(); assertThat(cache).isEmpty(); }); }
Server Taskの呼び出し自体はとても簡単で、RemoteCache#executeで呼び出すServer Task名と、Server Taskに必要な
パラメーターをMapで指定します。
// パラメーターなし Result result = cache.execute("price-sum-task", Collections.emptyMap()); // パラメーターあり Map<String, Object> parameters = Map.of("greaterThanPrice", 5000); Result result = cache.execute("price-sum-task", parameters);
実行すると、こんな感じでInfinispan Server上で分散実行されていることが確認できます。
パラメーターなしの場合。
## Infinispan Server (172.17.0.4) 2021-07-10 14:06:09,434 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] set task context 2021-07-10 14:06:09,435 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] start task 2021-07-10 14:06:09,439 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] execution mode = ONE_NODE, cache size = 10, greaterThan = 0 2021-07-10 14:06:09,441 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1785285332 2021-07-10 14:06:09,441 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1789347531 2021-07-10 14:06:09,442 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1782169970 2021-07-10 14:06:09,443 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-0359439379 2021-07-10 14:06:09,443 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1484257807 2021-07-10 14:06:09,443 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1491936160 2021-07-10 14:06:09,443 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1365732355 2021-07-10 14:06:09,444 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1849518222 2021-07-10 14:06:09,444 INFO (SINGLE_PORT-ServerIO-3-9) [org.littlewings.infinispan.task.PriceSumTask] end task, result = 50446 ## Infinspan Server(172.17.0.3) 2021-07-10 14:06:09,442 INFO (jgroups-22,129663e80859-52991) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1617295522 2021-07-10 14:06:09,442 INFO (jgroups-22,129663e80859-52991) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1788997829
パラメーターありの場合。
## Infinispan Server(172.17.0.2) 2021-07-10 14:08:58,613 INFO (SINGLE_PORT-ServerIO-3-1) [org.littlewings.infinispan.task.PriceSumTask] set task context 2021-07-10 14:08:58,614 INFO (SINGLE_PORT-ServerIO-3-1) [org.littlewings.infinispan.task.PriceSumTask] start task 2021-07-10 14:08:58,615 INFO (SINGLE_PORT-ServerIO-3-1) [org.littlewings.infinispan.task.PriceSumTask] execution mode = ONE_NODE, cache size = 10, greaterThan = 5000 2021-07-10 14:08:58,616 INFO (SINGLE_PORT-ServerIO-3-1) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1617295522 2021-07-10 14:08:58,617 INFO (SINGLE_PORT-ServerIO-3-1) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1782169970 2021-07-10 14:08:58,617 INFO (SINGLE_PORT-ServerIO-3-1) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-0359439379 2021-07-10 14:08:58,618 INFO (SINGLE_PORT-ServerIO-3-1) [org.littlewings.infinispan.task.PriceSumTask] end task, result = 25669 ## Infinispan Server(172.17.0.3) 2021-07-10 14:08:58,617 INFO (jgroups-25,129663e80859-52991) [org.littlewings.infinispan.task.PriceSumTask] map entry = 978-1365732355
Server Taskの実行モードこそONE_NODE(単一Node)ですが、Server Taskから起動される処理内で分散処理などを行い、
クラスタ全体に跨る操作をすることは可能です。
これで、デプロイしたServer Taskが動作していることが確認できました、と
オマケ
ここでは、いくつか内部を追ってみたことを書こうかなと思います。
Server Taskのインスタンスが生成されるタイミングと個数は?
Server Taskは、各Infinispan Serverで単一のインスタンスになります。
Infinispan Serverの起動時にExtentionとしてService Loaderの仕組みでロードされ、インスタンス化されます。
ところで、単一のインスタンスがゆえに、TaskContextをsetterで受け取ると並行性が気になるところですが、
これはServer Task側ではどう対策するのがいいんでしょうね?ThreadLocalでしょうか?
あるServer Taskの実行は、リクエストがキューで管理されていたり、同期化されていたりするわけではないので…。
Server Taskの実行の様子は?
Server Taskの実行は、Infinispan Serverでリクエストを受けるとまずはTaskRequestProcessorによってハンドリングされます。
そして、ServerTaskEngineによって実行モードが単一Nodeか全NodeかでServerTaskRunnerが決められ、実際のServer Taskを
起動します。
単一Nodeの場合のServerTaskRunnerはこちら。
実際のServer TaskのインスタンスをラップしたServerTaskWrapper越しに、Server Taskを呼び出すだけです。
ここまででは動作させていませんが、全Nodeの場合はこちらのクラスが使われます。
※なぜ動作させなかったのは、後述します
そして、Cluster Executorの仕組みで全Node上でServer Taskを実行します。
まとめ?
ServerNGとなったInfinispan Serverで、ProtoStreamでMarshallingしつつServer Taskを使ってみました。
動かすまで一苦労だったのですが、Marshalling、認証・認可などいろいろ勉強になりました…。
この後に書いている「ハマったこと?」で記載しているソースコードも含めて、以下に置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-task-serverng-proto
ハマったこと?
ここまでの動作確認と、結論を出すにはいろいろとハマったことがありまして。
そちらを紹介したいと思います。
ADMIN権限を持っていないと、Server Taskを実行できない
ところでClusterRoleMapperの説明を見ていると、EXEC権限を含むapplicationグループにユーザーを作れば良さそうですが、
どうやらそれでは権限が足りないようで…。
認可設定が有効な場合、Server Taskの実行時に権限の有無を確認するのですが
この時に、権限確認の操作を行うためにADMIN権限が必要になっていて、事実上今はADMIN権限がないとServer Taskを
実行できなさそうです。
認可を有効にすると、CacheはSecureCacheImplでラップされるようです。
このため、今はこうなります、と。
仮に、applicationグループのユーザーでServer Taskを実行した場合は、以下のように例外がスローされます。
7月 10, 2021 10:53:59 午後 org.infinispan.client.hotrod.impl.protocol.Codec20 checkForErrorsInResponseStatus
WARN: ISPN004005: Error received from the server: java.lang.SecurityException: ISPN000287: Unauthorized access: subject 'Subject with principal(s): [ispn-app, RolePrincipal{name='application'}, InetAddressPrincipal [address=172.17.0.1/172.17.0.1]]' lacks 'ADMIN' permission
org.infinispan.client.hotrod.exceptions.HotRodClientException:Request for messageId=61 returned server error (status=0x85): java.lang.SecurityException: ISPN000287: Unauthorized access: subject 'Subject with principal(s): [ispn-app, RolePrincipal{name='application'}, InetAddressPrincipal [address=172.17.0.1/172.17.0.1]]' lacks 'ADMIN' permission
そのため、今回は全権限(ALL)を持ったadminグループのユーザーを使いました。
実行モードを全Nodeにできない
今回、Server Taskの実行モードを単一Node(ONE_NODE)としましたが、全Node(ALL_NODES)とすることが
できませんでした。
これは、Marshallingの仕組みをProtoStreamにしている場合、です。
理由はこちらで、実行モードをALL_NODESとすると、Hot Rod Clientには結果がList(各Nodeでの実行結果が含まれる)と
なるのですが、これがCollections#synchronizedListで作成されます。
ProtoStreamには、JDKで用意されているクラスのいくつかにはデフォルトでMarshallerを用意しています。
ですが、この中にはCollections#synchronizedListに対応するものがないため、Server Taskの実行完了時にMarshallingできずに
失敗します。
まあ、全体を跨る処理は単一Nodeでの実行で分散処理をすることになる気がするので、そんなに困らない気はするの
ですが…どうでしょう?
ProtoStreamでCollections#synchronizedListをMarshallingできないことは、テストコードで簡単に確認できます。
以下が、Collections#synchronizedListとArrayListをそれぞれProtoStreamでMarshallingするテストコードです。
@Test public void failMarshallingSynchronizedList() { this.<String, Book>withRemoteCache("bookCache", cache -> { Marshaller marshaller = cache.getRemoteCacheManager().getMarshaller(); List<String> list = Collections.synchronizedList(new ArrayList<>()); list.add("Hello"); list.add("World"); assertThatThrownBy(() -> marshaller.objectToByteBuffer(list)) .hasMessageContaining("No marshaller registered for object of Java type java.util.Collections$SynchronizedRandomAccessList") .isInstanceOf(IllegalArgumentException.class); }); } @Test public void marshallingArrayList() { this.<String, Book>withRemoteCache("bookCache", cache -> { Marshaller marshaller = cache.getRemoteCacheManager().getMarshaller(); List<String> list = new ArrayList<>(); list.add("Hello"); list.add("World"); try { byte[] binary = marshaller.objectToByteBuffer(list); assertThat(binary).hasSize(67); assertThat((List<String>) marshaller.objectFromByteBuffer(binary)) .isEqualTo(list) .isInstanceOf(ArrayList.class) .containsExactly("Hello", "World"); } catch (IOException | InterruptedException | ClassNotFoundException e) { fail("fail", e); } }); }
ArrayListの方はうまくいきますが、Collections#synchronizedListの方は失敗します。
これが、Infinispan ServerにデプロイしたServer Taskの実行時にも発生します。
なお、全Node(ALL_NODES)の実行モードで作成したServer Taskはデプロイ自体は可能ですし、Marshallingの仕組みを
Java標準のシリアライズにすれば実行可能になります。
おそらく、Cacheに保存するデータはProtoStreamでエンコーディングし、Server Taskの実行時にHot Rod Clientと
Infinispan Serverの間のやり取りをJava標準のシリアライズの仕組みとすれば、データの保存はProtoStreamのままでも
良いかもしれません。が、そこまでは試していません。
ちなみに、ServerNGとなる前まではCollections#synchronizedListではなくArrayListだったようです。
実行モードを全Nodeにすると、パラメーターがStringに強制変換される
それでもと、全Node(ALL_NODES)で動かしてみたのですが、ちょっと驚くことにこの場合はパラメーターが
Stringに強制変換されます。
以下が、単一Node向けに作成したServer Taskの、全Node(ALL_NODES)版です。
task/src/main/java/org/littlewings/infinispan/task/PriceSumAllNodesTask.java
package org.littlewings.infinispan.task; import java.util.Collections; import java.util.Map; import java.util.stream.Collectors; import org.infinispan.Cache; import org.infinispan.tasks.ServerTask; import org.infinispan.tasks.TaskContext; import org.infinispan.tasks.TaskExecutionMode; import org.jboss.logging.Logger; import org.littlewings.infinispan.entity.Book; import org.littlewings.infinispan.entity.Result; public class PriceSumAllNodesTask implements ServerTask<Result> { TaskContext taskContext; @Override public void setTaskContext(TaskContext taskContext) { Logger logger = Logger.getLogger(PriceSumAllNodesTask.class); logger.infof("set task context"); this.taskContext = taskContext; } @Override public Result call() throws Exception { Logger logger = Logger.getLogger(PriceSumAllNodesTask.class); logger.infof("start task"); Map<String, ?> parameters = taskContext.getParameters().orElse(Collections.emptyMap()); int greaterThanPrice; if (parameters.containsKey("greaterThanPrice")) { greaterThanPrice = Integer.parseInt((String) parameters.get("greaterThanPrice")); // ALL_NODESの時は値はStringのみ } else { greaterThanPrice = 0; } Cache<String, Book> cache = (Cache<String, Book>) taskContext.getCache().orElseThrow(); logger.infof("execution mode = %s, cache size = %d, greaterThan = %d", getExecutionMode(), cache.size(), greaterThanPrice); Result result = Result.create( cache .values() .stream() .filter(book -> book.getPrice() > greaterThanPrice) .map(book -> { Logger l = Logger.getLogger(PriceSumAllNodesTask.class); l.infof("map entry = %s", book.getIsbn()); return book.getPrice(); }) .collect(() -> Collectors.summingInt(price -> price)) ); logger.infof("end task, result = %d", result.getValue()); return result; } @Override public String getName() { return "price-sum-all-nodes-task"; } @Override public TaskExecutionMode getExecutionMode() { return TaskExecutionMode.ALL_NODES; // default, ONE_NODE } }
Service Loaderの仕組みでロードするファイルは、こうなります。
task/src/main/resources/META-INF/services/org.infinispan.tasks.ServerTask
org.littlewings.infinispan.task.PriceSumTask org.littlewings.infinispan.task.PriceSumAllNodesTask
実行モードは当然違うのですが
@Override public TaskExecutionMode getExecutionMode() { return TaskExecutionMode.ALL_NODES; // default, ONE_NODE }
その他に変わった差異として、パラメーターがStringになっています。
if (parameters.containsKey("greaterThanPrice")) { greaterThanPrice = Integer.parseInt((String) parameters.get("greaterThanPrice")); // ALL_NODESの時は値はStringのみ } else { greaterThanPrice = 0; }
これはどうしたことだろう?と思ったのですが、意図的に(?)Object#toStringしていますね。
これは、パラメーターをProtoStreamでMarshallingする関係上、型を決めないといけなかったのかな?と思うのですが、
どうでしょうか。
なお、単一Nodeの場合は、このようなことは発生しません。
Collections#synchronizedListがMarshallingできないことも含めて、全Nodeでの実行モードで動かした時のテストコードはこちらです。
@Test public void allNodesTask() { this.<String, Book>withRemoteCache("bookCache", cache -> { books.forEach(b -> cache.put(b.getIsbn(), b)); assertThatThrownBy(() -> cache.execute("price-sum-all-nodes-task", Collections.emptyMap())) .isInstanceOf(HotRodClientException.class) .hasMessage("org.infinispan.commons.marshall.MarshallingException: ISPN000615: Unable to unmarshall 'java.util.Collections$SynchronizedRandomAccessList' as a marshaller is not present in the user or global SerializationContext"); cache.clear(); assertThat(cache).isEmpty(); }); } @Test public void allNodesTaskWithParameters() { this.<String, Book>withRemoteCache("bookCache", cache -> { books.forEach(b -> cache.put(b.getIsbn(), b)); Map<String, Object> parameters = Map.of("greaterThanPrice", 5000); assertThatThrownBy(() -> cache.execute("price-sum-all-nodes-task", parameters)) .isInstanceOf(HotRodClientException.class) .hasMessage("org.infinispan.commons.marshall.MarshallingException: ISPN000615: Unable to unmarshall 'java.util.Collections$SynchronizedRandomAccessList' as a marshaller is not present in the user or global SerializationContext"); cache.clear(); assertThat(cache).isEmpty(); }); }
ハマったことと、その記録でした…。