Infinispanのドキュメントに載っていないAPIで、Remote Task Executionというものがあることに
JBoss Data Gridのドキュメントを見ていて気づきました。
Chapter 32. Remote Task Execution - Red Hat Customer Portal
Infinispanのリポジトリ上でも、それっぽいAPIがあるのにはなんとなく気づいていたのですが、
ちゃんとしたドキュメントを見るのがJBoss Data Grid上のものが初めてになるとは思っていませんでした…。
でも、見てみると8.2からの機能っぽい…?だったら、ドキュメントなくてもまあ…?
Remote Task Executionとは?
Infinispan Server上にタスクをデプロイして、Infinispan Server上で直接タスクを実行させる仕組みになります。
グリッドを構成する単一のNode、もしくは全Node上でタスクを実行することができます。
ドキュメント自体と、登場するAPIはそれほど多くないので、以下をさらっと見るとだいたい概要は
つかめると思います。
32.2. Installing Remote Tasks - Red Hat Customer Portal
32.3. Removing Remote Tasks - Red Hat Customer Portal
32.4. Running Remote Tasks - Red Hat Customer Portal
と思っていたら、バッチリハマったわけですが…。
まあ、順次見ていってみましょう。
準備
まずは、ビルド定義から。最初に、最終形を載せます。
build.sbt
name := "remote-task" version := "0.0.1-SNAPSHOT" scalaVersion := "2.12.1" organization := "org.littlewings" scalacOptions ++= Seq("-Xlint", "-unchecked", "-deprecation", "-feature") updateOptions := updateOptions.value.withCachedResolution(true) assemblyJarName in assembly := "remote-task.jar" assemblyMergeStrategy in assembly := { case "org/littlewings/infinispan/task/entity/Book.class" => MergeStrategy.discard case x => val oldStrategy = (assemblyMergeStrategy in assembly).value oldStrategy(x) } packageOptions in (Compile, packageBin) += Package.ManifestAttributes("Dependencies" -> "org.littlewings.task.entity") test in assembly := {} fork in Test := true libraryDependencies ++= Seq( "org.infinispan" % "infinispan-tasks-api" % "8.2.6.Final" % Provided, "net.jcip" % "jcip-annotations" % "1.0" % Provided, "org.infinispan" % "infinispan-client-hotrod" % "8.2.6.Final" % Test, "org.scalatest" %% "scalatest" % "3.0.1" % Test )
プラグイン設定。build.sbtにも設定がちょこっと登場していますが、sbt-assemblyを使用しています。
project/plugins.sbt
logLevel := Level.Warn addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")
詳細は内容を進めるにあたって説明を入れていきますが、依存関係だけ最初に説明しておきます。
少なくともRemote Task Executionを使うために必要な依存関係は、「infinispan-tasks-api」となります。
"org.infinispan" % "infinispan-tasks-api" % "8.2.6.Final" % Provided,
スコープはprovidedでOKです。
※なお、Scalaで書いているため「jcip-annotations」が必要になっていますが、Javaで実装する際には不要です
また、タスクを呼び出すにはHot Rod Clientが必要になります。
"org.infinispan" % "infinispan-client-hotrod" % "8.2.6.Final" % Test,
今回はテストコード中でHot Rod Clientを使用するので、スコープはtestとしてあります。テスティングフレームワーク
としては、ScalaTestとしました。
Infinispan Serverの用意
Infinispan Serverでタスクを実行すると言っている以上、Infinispan Serverが必要になります。
このエントリでは、Infinispan Serverを1 Node、そして3 Nodeでクラスタを構成するケースで書いてみたいと
思います。
3台のInfinispan Serverで動作させる時は、Infinispan Serverに対して行った管理系のコマンド(たとえばユーザーの追加、
Cacheの追加)は、各Infinispan Serverに対してそれぞれ行うこととします。
各Infinispan Serverが動作するホストの名前は、便宜上infinispan1、infinispan2、infinispan3とします。
Infinispan Serverの起動コマンドは、以下とします。
$ bin/standalone.sh -c clustered.xml -Djboss.bind.address=<ServerのIPアドレス>
Cacheの作成や、タスクのデプロイをすることになるので、Infinispan Server上にユーザーを作成しておきます。
※各Infinispan Serverに対して、コマンドを実行します
$ bin/add-user.sh -u test -p testpassword
今回は、ユーザー名「test」、パスワード「testpassword」でManagement Userを作成しました。
あと、最初に使うCacheも作成しておきましょう。Cacheの種類はDistributed Cacheとします。
※各Infinispan Serverに対して、コマンドを実行します
$ bin/ispn-cli.sh -u=test -p=testpassword -c --command="/subsystem=datagrid-infinispan/cache-container=clustered/configurations=CONFIGURATIONS/distributed-cache-configuration=simple:add(start=EAGER,mode=SYNC,owners=2)" $ bin/ispn-cli.sh -u=test -p=testpassword -c --command="/subsystem=datagrid-infinispan/cache-container=clustered/distributed-cache=simpleCache:add(configuration=simple)"
ここで作成したCacheの名前は、「simpleCache」となります。
テストコードの雛形
続いて、テストコードの雛形を用意します。
以下のimport文があるものとし(自作のクラスも含まれていますが)、RemoteCacheを使うためのヘルパーメソッドを設けます。
src/test/scala/org/littlewings/infinispan/task/ServerTaskSpec.scala
package org.littlewings.infinispan.task import org.infinispan.client.hotrod.configuration.ConfigurationBuilder import org.infinispan.client.hotrod.{RemoteCache, RemoteCacheManager} import org.littlewings.infinispan.task.entity.Book import org.scalatest.{FunSuite, Matchers} import scala.collection.JavaConverters._ class ServerTaskSpec extends FunSuite with Matchers { // ここに、テストを書く! def withRemoteCache[K, V](cacheName: String)(fun: RemoteCache[K, V] => Unit): Unit = { val manager = new RemoteCacheManager( new ConfigurationBuilder().addServers("172.17.0.2:11222").build() ) try { val cache = manager.getCache[K, V](cacheName) fun(cache) cache.stop() } finally { manager.stop() } } }
Infinispan Serverへの接続先はとりあえず最初としては「172.17.0.2」としていますが、Infinispan Server側で
クラスタが構成されていれば、Hot Rod Client側でも自動的に追加されたInfinispan Serverを認識します。
val manager = new RemoteCacheManager( new ConfigurationBuilder().addServers("172.17.0.2:11222").build() )
ここまでで、最初の準備は完了です。
はじめてのRemote Task
それでは、タスクを作成していきます。
こちらにも書いていますが、基本的にはServerTaskインターフェースを実装したクラスを作成するだけになります。
ServerTaskインターフェースはCallableインターフェースを拡張したものなのですが、その他に少なくとも以下の
3つのメソッドを実装する必要があります。
- call … タスクの処理そのもの
- setTaskContext … Cacheやその他のリソースにアクセスするために使用
- getName … タスク単位にユニークな名前を指定する(Hot Rod Clientからの呼び出し時に使用)
また、必要に応じて以下の2つのメソッドをオーバーライドすることができます。デフォルト値は、defaultメソッド
として実装されています。
- getExecutionMode … タスクを実行するNodeを、TaskExecutionMode.ONE_NODE(単一Node/デフォルト)またはTaskExecutionMode.ALL_NODES(全Node)で指定
- getAllowedRole … タスクを実行できるRoleをOptionalで指定。デフォルトはOptional.emptyで特にRole制限はない
では、最初に「単一Nodeで動作し」、「タスクを呼び出す際に受け取った引数を使って結果を返す」タスクを作ってみます。
作成したコードは、こちら。
src/main/scala/org/littlewings/infinispan/task/OneNodeSimpleTask.scala
package org.littlewings.infinispan.task import java.net.InetAddress import org.infinispan.tasks.{ServerTask, TaskContext} class OneNodeSimpleTask extends ServerTask[String] { private[task] var taskContext: TaskContext = _ override def getName: String = "oneNodeSimpleTask" override def setTaskContext(taskContext: TaskContext): Unit = this.taskContext = taskContext override def call: String = { val cache = taskContext.getCache val parameters = taskContext.getParameters val name = if (parameters.isPresent) parameters.get.get("name").asInstanceOf[String] else "Task" val cacheName = if (cache.isPresent) cache.get.getName else "nonCache" s"Hello ${name}!!, Cache[${cacheName}] by ${InetAddress.getLocalHost.getHostName}" } }
最初なので、説明を。
まず、ServerTaskインターフェースを実装するわけですが、ServerTaskインターフェースは型パラメーターを取ります。
ここで指定した型パラメーターが、タスクの戻り値となります。
今回は、Stringとしました。
class OneNodeSimpleTask extends ServerTask[String] {
となると、callメソッドの戻り値はStringとなります。
override def call: String = {
getNameメソッドでは、このタスクの名前を指定します。Infinispan Server上で、ユニークである必要があるみたいです。
override def getName: String = "oneNodeSimpleTask"
で、setTaskContextメソッドでTaskContextを受け取って
override def setTaskContext(taskContext: TaskContext): Unit = this.taskContext = taskContext
callメソッドで、やりたい処理を実装します。
override def call: String = { val cache = taskContext.getCache val parameters = taskContext.getParameters val name = if (parameters.isPresent) parameters.get.get("name").asInstanceOf[String] else "Task" val cacheName = if (cache.isPresent) cache.get.getName else "nonCache" s"Hello ${name}!!, Cache[${cacheName}] by ${InetAddress.getLocalHost.getHostName}" }
TaskContextからは、呼び出しに使用されたCache、呼び出し時の引数、そして(今回は使っていませんが)Marshallerを
取得することができます。
今回は、呼び出し時の引数「name」、タスクを実行したCache名、そしてInfinispan Serverが動作しているホスト名を
返却するように実装してみました。
s"Hello ${name}!!, Cache[${cacheName}] by ${InetAddress.getLocalHost.getHostName}"
ちなみに、TaskContextをこういうふうに受け取っていると、そもそもServerTaskのライフサイクルとの関係が
気になるところですが…
override def setTaskContext(taskContext: TaskContext): Unit = this.taskContext = taskContext
どうも、ServerTaskはSingletonのようです。なので、同じタスクを同時に呼び出されたりすると困ったことになりそうなので、
場合によってはThreadLocalとか使った方がいいんでしょうかねぇ…。
ここまでで、ServerTaskの実装ができあがりました。
それでは、このタスクをJARファイルにパッケージングします。
…の前に、META-INF/servicesにService Providerの仕組みに乗せた設定を書く必要があります。
src/main/resources/META-INF/services/org.infinispan.tasks.ServerTask
org.littlewings.infinispan.task.OneNodeSimpleTask
これ、JBoss Data Gridのドキュメントにも書いていなくて、テストコードを見てやっと気づきました…。
続いて、JARファイルにパッケージングするわけですが、単一のJARファイルとしてデプロイする必要があります。
今回、依存関係にScalaが入っているのでScalaを含めた形のJARファイルを作成する必要があります。
なので、sbt-assemblyを使用しています。
できあがるJARファイルの名前は、「remote-task.jar」、テスト時にassemblyは作成しないように設定。
assemblyJarName in assembly := "remote-task.jar"
test in assembly := {}
で、JARファイル作成。
> assembly
これを、Infinispan Serverにデプロイします。
※各Infinispan Serverに対して、コマンドを実行します
$ bin/ispn-cli.sh -u=test -p=testpassword -c --command="deploy /path/to/remote-task.jar"
デプロイすると、コンソールにこんな感じのログが出力され、作成したServerTaskが認識されていることが確認できます。
2017-03-28 13:25:52,326 INFO [org.jboss.as.clustering.infinispan] (MSC service thread 1-8) DGISPN0011: Installing ServerTask service implementation 'org.littlewings.infinispan.task.OneNodeSimpleTask'
では、テストコードで動作確認してみましょう。簡単のため、今回はInfinispan Serverの1 Nodeのみを対象とします
(サーバー名は、infinispan1)。
できあがったコードは、こちら。利用するCacheは、準備段階で作成したCacheにしました(デフォルトのものでもかまいませんが)。
test("one-node simple-task") { withRemoteCache[String, String]("simpleCache") { cache => val parameters = Map("name" -> "MyFirstTask").asJava val result = cache.execute[String]("oneNodeSimpleTask", parameters) result should be("Hello MyFirstTask!!, Cache[simpleCache] by infinispan1") } }
ポイントですが、タスクを呼び出す際のパラメーターは、java.util.Mapとして作成します。
※ここではScalaのMapからjava.util.Mapに変換していますが
val parameters = Map("name" -> "MyFirstTask").asJava
あとは、RemoteCache#executeで、デプロイしたタスクの名前とともに、パラメーターを渡すとタスクを呼び出すことができます。
val result = cache.execute[String]("oneNodeSimpleTask", parameters)
これで、初めてのRemote Taskの実装と確認が完了しました。
全Nodeでタスクを実行する
先ほどは、単一のNodeで動作するタスクを作成しましたが、今度は全Nodeでタスクを実行するように実装してみます。
ここから先は、Infinspan Serverが3 Node起動している前提とします。
作成したのは、こちら。内容的には、先ほどの単一Nodeで動作させるタスクとほぼ同じ実装です。
src/main/scala/org/littlewings/infinispan/task/AllNodeSimpleTask.scala
package org.littlewings.infinispan.task import java.net.InetAddress import org.infinispan.tasks.{ServerTask, TaskContext, TaskExecutionMode} class AllNodeSimpleTask extends ServerTask[String] { private[task] var taskContext: TaskContext = _ override def getName: String = "allNodeSimpleTask" override def setTaskContext(taskContext: TaskContext): Unit = this.taskContext = taskContext override def getExecutionMode: TaskExecutionMode = TaskExecutionMode.ALL_NODES override def call: String = { val cache = taskContext.getCache val parameters = taskContext.getParameters val name = if (parameters.isPresent) parameters.get.get("name").asInstanceOf[String] else "Task" val cacheName = if (cache.isPresent) cache.get.getName else "nonCache" s"Hello ${name}!!, Cache[${cacheName}] by ${InetAddress.getLocalHost.getHostName}" } }
異なるのは、タスク名が異なるというのと
override def getName: String = "allNodeSimpleTask"
getExecutionModeで、TaskExecutionMode.ALL_NODESを返すように実装しているということですね。
override def getExecutionMode: TaskExecutionMode = TaskExecutionMode.ALL_NODES
で、作成したクラス名を追記します。
src/main/resources/META-INF/services/org.infinispan.tasks.ServerTask
org.littlewings.infinispan.task.OneNodeSimpleTask org.littlewings.infinispan.task.AllNodeSimpleTask
あとはデプロイして、テストコードで確認しましょう。
補足)
すでにJARファイルをデプロイした状態でデプロイする場合は、1度アンデプロイして再度デプロイするか、「--force」オプションを使用して
強制的にデプロイする方法があります。以下は、「--force」を使った例です。
$ bin/ispn-cli.sh -u=test -p=testpassword -c --command="deploy --force /path/to/remote-task.jar"
作成したテストコードは、こちら。
test("all-node simple-task") { withRemoteCache[String, String]("simpleCache") { cache => val parameters = Map("name" -> "MyFirstTask").asJava val results = cache.execute[java.util.List[String]]("allNodeSimpleTask", parameters) results should (contain("Hello MyFirstTask!!, Cache[simpleCache] by infinispan1") and contain("Hello MyFirstTask!!, Cache[simpleCache] by infinispan2") and contain("Hello MyFirstTask!!, Cache[simpleCache] by infinispan3")) } }
単一Nodeの時とほぼ同じですが、全Nodeに対して呼び出しを行う場合は、呼び出し結果がListとして返却されます。
val results = cache.execute[java.util.List[String]]("allNodeSimpleTask", parameters)
Listの要素数は、Node数と同じになります。
これで、全Nodeに対してタスクを実行することができました。
タスク内でCache内のエントリを扱う
先ほどまでは、タスク呼び出し時のパラメーターは使用していましたが、CacheについてはCacheの名前以外まったく触れていませんでした。
今度は、Cache内のエントリを扱うタスクを作成してみたいと思います。実行Nodeは、単一とします。
src/main/scala/org/littlewings/infinispan/task/CacheSimpleTask.scala
package org.littlewings.infinispan.task import org.infinispan.Cache import org.infinispan.tasks.{ServerTask, TaskContext} class CacheSimpleTask extends ServerTask[String] { private[task] var taskContext: TaskContext = _ override def getName: String = "cacheSimpleTask" override def setTaskContext(taskContext: TaskContext): Unit = this.taskContext = taskContext override def call: String = { val parameters = taskContext.getParameters.get val marshaller = taskContext.getMarshaller.get val cache = taskContext.getCache.get.asInstanceOf[Cache[Array[Byte], Array[Byte]]] val key = parameters.get("key") val keyAsBinary = marshaller.objectToByteBuffer(key) val value = marshaller.objectFromByteBuffer(cache.get(keyAsBinary)) s"key = ${key}, value = ${value}" } }
先ほどとちょっと変わったところとして、Marshallerが出てきました。Marshallerは、TaskContextから取得できます。
val marshaller = taskContext.getMarshaller.get
というか、TaskContextから取得したMarshallerを使う必要があります。その他の、EmbeddedCacheManagerから取得できる
Marshallerを使用してはいけません。
特にCompatibility Modeを有効にしていないCacheの場合、Hot Rodで使う場合はキーも値もbyte配列が入っていることに
なっています。
val cache = taskContext.getCache.get.asInstanceOf[Cache[Array[Byte], Array[Byte]]]
ここで、キーも値もStringと期待する場合、それぞれMarshallerを使ってbyte配列に変換、もしくはbyteから復元する必要があります。
val keyAsBinary = marshaller.objectToByteBuffer(key) val value = marshaller.objectFromByteBuffer(cache.get(keyAsBinary))
キー自体は、呼び出し元からパラメーターとして教えてもらうことにします。
val key = parameters.get("key")
これで、Cache内のエントリを使用するタスクを実装できました。
このタスクも、Service Providerの設定に追記して、デプロイします。
src/main/resources/META-INF/services/org.infinispan.tasks.ServerTask
org.littlewings.infinispan.task.OneNodeSimpleTask org.littlewings.infinispan.task.AllNodeSimpleTask org.littlewings.infinispan.task.CacheSimpleTask
テストコードで確認。
test("one-node cache-task") { withRemoteCache[String, String]("simpleCache") { cache => val parameters = Map("key" -> "simpleKey").asJava cache.put("simpleKey", "simpleValue") val result = cache.execute[String]("cacheSimpleTask", parameters) result should be("key = simpleKey, value = simpleValue") } }
今度は、あらかじめRemoteCache#putでデータを登録しておきます。
cache.put("simpleKey", "simpleValue")
それ以外は、まあふつうに…。
自作のクラスを使用する
ここまでは、CacheにString…Javaに標準で含まれているクラスしかタスク内で使用しませんでしたが、次は自分で作成した
クラスを使用してみたいと思います。
具体的には、書籍(Book)をお題に、以下のようなことをやってみます。
- 単体のNodeで実行するタスクとする
- Bookクラスをキャッシュの値として登録(キーはISBN:Stringとする)
- Book自体をタスクのパラメーターとする(パラメーターとして自作のクラスが使えることの確認)
- パラメーターとなったBookのISBNを使って、Cache内のBookを取得する(Cacheに登録された自作のクラスが使えることの確認)
- 価格を2倍したBookのインスタンスを生成し、戻り値として返却する(戻り値に自作のクラスが使えることの確認)
お題となるBookクラスのソースコードは、こちら。このあとで説明しますが、都合によりJavaで書いています。
src/main/java/org/littlewings/infinispan/task/entity/Book.java
package org.littlewings.infinispan.task.entity; import java.io.Serializable; public class Book implements Serializable { private static final long serialVersionUID = 1L; private String isbn; private String title; private int price; public Book(String isbn, String title, int price) { this.isbn = isbn; this.title = title; this.price = price; } // getter、setterは省略 }
ServerTaskの実装自体は、先ほどMarshallerを使ったサンプルと、そう変わらない形で実装できます。
src/main/scala/org/littlewings/infinispan/task/BookPriceDoublingTask.scala
package org.littlewings.infinispan.task import org.infinispan.Cache import org.infinispan.tasks.{ServerTask, TaskContext} import org.littlewings.infinispan.task.entity.Book class BookPriceDoublingTask extends ServerTask[Book] { private[task] var taskContext: TaskContext = _ override def getName: String = "bookPriceDoublingTask" override def setTaskContext(taskContext: TaskContext): Unit = this.taskContext = taskContext override def call: Book = { val parameters = taskContext.getParameters.get val marshaller = taskContext.getMarshaller.get val cache = taskContext.getCache.get.asInstanceOf[Cache[Array[Byte], Array[Byte]]] // parameter val parameterBook = parameters.get("target").asInstanceOf[Book] // query val keyAsBinary = marshaller.objectToByteBuffer(parameterBook.getIsbn) val cacheBook = marshaller.objectFromByteBuffer(cache.get(keyAsBinary)).asInstanceOf[Book] // new new Book(cacheBook.getIsbn, cacheBook.getTitle, cacheBook.getPrice * 2) } }
Cacheはやはりbyte配列でのキー/値となっているので
val cache = taskContext.getCache.get.asInstanceOf[Cache[Array[Byte], Array[Byte]]]
Marshallerでオブジェクトからbyte配列、byte配列からオブジェクトへの変換をしてあげる必要があります。
// query val keyAsBinary = marshaller.objectToByteBuffer(parameterBook.getIsbn) val cacheBook = marshaller.objectFromByteBuffer(cache.get(keyAsBinary)).asInstanceOf[Book]
Service Providerの設定にも、作成したタスクを追加します。
src/main/resources/META-INF/services/org.infinispan.tasks.ServerTask
org.littlewings.infinispan.task.OneNodeSimpleTask org.littlewings.infinispan.task.AllNodeSimpleTask org.littlewings.infinispan.task.CacheSimpleTask org.littlewings.infinispan.task.BookPriceDoublingTask
あとはデプロイすればOK…と思いきや、そのようにしてタスクを実行すると、Server側でClassNotFoundExceptionを
見ることになります。
これは、MarshallerからBookクラスが見えないためです。これを回避するために、Infinispan Serverのモジュールとして
Bookクラスを追加します。
まず、Bookクラスのみを含めたJARファイルを作成します。JARファイルの名前は、「entity.jar」としましょう。
$ jar -cvf entity.jar -C classes org/littlewings/infinispan/task/entity/Book.class
このJARファイルを、Infinispan Serverの「modules/system/layers/base」ディレクトリ配下に登録します。
※ここから先のInfinispan Serverへの操作は、各Infinispan Serverに対してコマンドを実行します
まずはディレクトリ作成。「modules/system/layers/base/org/littlewings/task/entity/main」配下にJARファイルを
置くようにしましょう。
$ mkdir -p modules/system/layers/base/org/littlewings/task/entity/main
module.xmlを作成します。
$ vim modules/system/layers/base/org/littlewings/task/entity/main/module.xml
内容は、こんな感じで。
<?xml version="1.0" encoding="UTF-8"?> <module xmlns="urn:jboss:module:1.3" name="org.littlewings.task.entity"> <resources> <resource-root path="entity.jar"/> </resources> </module>
続いて、「infinispan-commons」モジュールに、追加したモジュールを参照するように設定変更します。
$ vim modules/system/layers/base/org/infinispan/commons/main/module.xml
こんな感じで。JBoss Marshallerよりも前にある必要があった…はずです。
<?xml version="1.0" encoding="UTF-8"?> <module xmlns="urn:jboss:module:1.3" name="org.infinispan.commons"> <resources> <resource-root path="infinispan-commons.jar"/> </resources> <dependencies> <module name="javax.api"/> <module name="javax.transaction.api"/> <module name="org.littlewings.task.entity"/> <module name="org.jboss.logging"/> <module name="org.jboss.marshalling" services="import"/> <module name="sun.jdk"/> </dependencies> </module>
ここまで実行したら、Infinispan Serverを再起動しましょう。
ここで、先ほどからデプロイしている「remote-task.jar」ファイル内に含まれているMANIFEST.MFファイルに、
以下の内容を追記する必要があります。
Dependencies: org.littlewings.task.entity
このため、sbt-assemblyの設定で、MANIFEST.MFに上記の内容が出力されるように設定を行います。
packageOptions in (Compile, packageBin) += Package.ManifestAttributes("Dependencies" -> "org.littlewings.task.entity")
また、「remote-task.jar」にBookクラス自体が含まれてはいけません。同じくsbt-assemblyの設定で、「remote-task.jar」から
Bookクラスを除外するように設定します。
assemblyMergeStrategy in assembly := { case "org/littlewings/infinispan/task/entity/Book.class" => MergeStrategy.discard case x => val oldStrategy = (assemblyMergeStrategy in assembly).value oldStrategy(x) }
あとはタスクを含んだJARファイルをデプロイすればOKですが、今回はCacheを分けましょう。
新しいDistributed Cacheをひとつ追加します。設定は、最初に作成した「simpleCache」と同じで、「bookCache」というCacheを追加。
※各Infinispan Serverに対して、コマンドを実行します
$ bin/ispn-cli.sh -u=test -p=testpassword -c --command="/subsystem=datagrid-infinispan/cache-container=clustered/distributed-cache=bookCache:add(configuration=simple)"
で、JARファイルをデプロイ後、テストコードで確認することができます。
test("one-node include original-class") { withRemoteCache[String, Book]("bookCache") { cache => val book = new Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4543) cache.put(book.getIsbn, book) val parameters = Map("target" -> book).asJava val resultBook = cache.execute[Book]("bookPriceDoublingTask", parameters) resultBook.getIsbn should be(book.getIsbn) resultBook.getTitle should be(book.getTitle) resultBook.getPrice should be(book.getPrice * 2) } }
Compatibility Modeを有効にした場合
先ほどのBookクラスを使用した例ですが、今度はCacheのCompatibility Modeを有効にした状態で確認してみましょう。
まずは、Compatibility Modeを有効にしたConfigurationとCacheを追加します。Cacheの名前は、「compatibilityBookCache」としました。
※設定は、Compatibility Mode以外は「simple」と同じです
※各Infinispan Serverに対して、コマンドを実行します
$ bin/ispn-cli.sh -u=test -p=testpassword -c --command="/subsystem=datagrid-infinispan/cache-container=clustered/configurations=CONFIGURATIONS/distributed-cache-configuration=compatibility:add(start=EAGER,mode=SYNC,owners=2)" $ bin/ispn-cli.sh -u=test -p=testpassword -c --command="/subsystem=datagrid-infinispan/cache-container=clustered/configurations=CONFIGURATIONS/distributed-cache-configuration=compatibility/compatibility=COMPATIBILITY:add(enabled=true)"
で、タスクを作成します。
src/main/scala/org/littlewings/infinispan/task/CompatibilityBookPriceDoublingTask.scala
package org.littlewings.infinispan.task import org.infinispan.Cache import org.infinispan.tasks.{ServerTask, TaskContext} import org.littlewings.infinispan.task.entity.Book class CompatibilityBookPriceDoublingTask extends ServerTask[Book] { private[task] var taskContext: TaskContext = _ override def getName: String = "compatibilityBookPriceDoublingTask" override def setTaskContext(taskContext: TaskContext): Unit = this.taskContext = taskContext override def call: Book = { val parameters = taskContext.getParameters.get val marshaller = taskContext.getMarshaller.get val cache = taskContext.getCache.get.asInstanceOf[Cache[String, Array[Byte]]] // parameter val parameterBook = parameters.get("target").asInstanceOf[Book] // query val isbn = parameterBook.getIsbn val cacheBook = marshaller.objectFromByteBuffer(cache.get(isbn)).asInstanceOf[Book] // new new Book(cacheBook.getIsbn, cacheBook.getTitle, cacheBook.getPrice * 2) } }
キーに対してMarshallerは不要になるのですが、値に対しては相変わらずMarshallerが必要だったりします…。
val cache = taskContext.getCache.get.asInstanceOf[Cache[String, Array[Byte]]] // parameter val parameterBook = parameters.get("target").asInstanceOf[Book] // query val isbn = parameterBook.getIsbn val cacheBook = marshaller.objectFromByteBuffer(cache.get(isbn)).asInstanceOf[Book]
このあたりの挙動、実はHot Rod越しのRemote Scriptingと同じだったりします…。
http://d.hatena.ne.jp/Kazuhira/20151223/1450884828
うーん…。
こちらも、Service Providerの設定に追加してデプロイ。
src/main/resources/META-INF/services/org.infinispan.tasks.ServerTask
org.littlewings.infinispan.task.OneNodeSimpleTask org.littlewings.infinispan.task.AllNodeSimpleTask org.littlewings.infinispan.task.CacheSimpleTask org.littlewings.infinispan.task.BookPriceDoublingTask org.littlewings.infinispan.task.CompatibilityBookPriceDoublingTask
テストコードは、そう変わりません。
test("one-node include original-class, with Compatibility mode") { withRemoteCache[String, Book]("compatibilityBookCache") { cache => val book = new Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4543) cache.put(book.getIsbn, book) val parameters = Map("target" -> book).asJava val resultBook = cache.execute[Book]("compatibilityBookPriceDoublingTask", parameters) resultBook.getIsbn should be(book.getIsbn) resultBook.getTitle should be(book.getTitle) resultBook.getPrice should be(book.getPrice * 2) } }
タスク内で、Distributed Stream APIを使う
最後に、タスク内でDistributed Stream APIを使ってみます。これを使うには、Compatibility Modeを有効にした
Cacheを使うのが無難な気がします…。
Distributed Cacheに対してDistributed Stream APIを使うので、タスクの実行Nodeは単体としましょう。
お題は、先ほど作成したBookのインスタンスを複数登録して、価格の合計を算出するタスクを作ってみます。
というわけで、Bookクラスを先ほどと同様、Infinispan Serverにモジュールとして登録していることを
前提とします。
で、作成したのがこちら。
src/main/scala/org/littlewings/infinispan/task/BookPriceSumTask.scala
package org.littlewings.infinispan.tas
import java.util.stream.{Collector, Collectors} import org.infinispan.Cache import org.infinispan.stream.{CacheCollectors, SerializableSupplier} import org.infinispan.tasks.{ServerTask, TaskContext} import org.littlewings.infinispan.task.entity.Book class BookPriceSumTask extends ServerTask[Int] { private[task] var taskContext: TaskContext = _ override def getName: String = "bookPriceSumTask" override def setTaskContext(taskContext: TaskContext): Unit = this.taskContext = taskContext override def call: Int = { val cache = taskContext.getCache.get.asInstanceOf[Cache[String, Book]] val stream = cache.entrySet.stream try { stream.map[Int](new java.util.function.Function[java.util.Map.Entry[String, Book], Int] with Serializable { override def apply(entity: java.util.Map.Entry[String, Book]): Int = entity.getValue.getPrice }).collect(CacheCollectors.serializableCollector[Int, Integer](new SerializableSupplier[Collector[Int, _, Integer]] { override def get: Collector[Int, _, Integer] = Collectors.summingInt[Int](i => Integer.valueOf(i)) })) } finally { stream.close() } } }
なんかですね、Distributed Stream APIを使った時は、Marshallerが不要みたいなんですけど、なんででしょう…。
かといって、MarshallerはSerializableではないですし、ここで渡しているFunctionやCollectorはSerializableである
必要があるので、もしも必要って言われるとそれはそれで困るのですが…。
型の記載がいろいろ面倒になっているのは、ScalaとJavaのStream APIの相性の悪さが効いています…。
あと、この実装ならStream#mapToIntとかにすればいいんじゃ?と言われそうですが、やったらサポートしてないよって
言われました…。
java.lang.UnsupportedOperationException: Primitive delegate is not yet supported!
で、このタスクもService Providerの設定に追加して、デプロイ。
src/main/resources/META-INF/services/org.infinispan.tasks.ServerTask
org.littlewings.infinispan.task.OneNodeSimpleTask org.littlewings.infinispan.task.AllNodeSimpleTask org.littlewings.infinispan.task.CacheSimpleTask org.littlewings.infinispan.task.BookPriceDoublingTask org.littlewings.infinispan.task.CompatibilityBookPriceDoublingTask org.littlewings.infinispan.task.BookPriceSumTask
テストコードは、こちら。
test("one-node include original-class using Distributed Stream API") { withRemoteCache[String, Book]("compatibilityBookCache") { cache => val books = Array( new Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4543), new Book("978-1785285332", "Getting Started With Hazelcast", 3533), new Book("978-1849519205", "Hibernate Search by Example", 3365), new Book("978-1783988181", "Mastering Redis", 6102), new Book("978-1491933664", "Cassandra: The Definitive Guide", 4637), new Book("978-1449344689", "MongoDB: The Definitive Guide", 3925), new Book("978-1449358549", "Elasticsearch: The Definitive Guide", 5951), new Book("978-1784399641", "Apache Solr Essentials", 3301), new Book("978-1449396107", "HBase: The Definitive Guide", 4680) ) books.foreach(b => cache.put(b.getIsbn, b)) val result = cache.execute[Integer]("bookPriceSumTask", new java.util.HashMap[String, AnyRef]) result should be(40037) } }
タスクさえデプロイできていれば、あとは動かせるでしょう。
とりあえず、試してみたいことはざっと確認できました…。
まとめ
Infinispan Server上で任意の処理を動かす、Remote Task Executionを試してみました。
存在をちゃんと認識していませんでしたが、知っておくと観点としてはありなのかなぁとちょっと
思いました。
ここまで通すのに、だいぶハマりましたけど…。
今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-task