CLOVER🍀

That was when it all began.

InfinispanのRemote Task Executionを使って、Infinispan Server上でタスクを実行する

Infinispanのドキュメントに載っていないAPIで、Remote Task Executionというものがあることに
JBoss Data Gridのドキュメントを見ていて気づきました。

Chapter 32. Remote Task Execution - Red Hat Customer Portal

Infinispanのリポジトリ上でも、それっぽいAPIがあるのにはなんとなく気づいていたのですが、
ちゃんとしたドキュメントを見るのがJBoss Data Grid上のものが初めてになるとは思っていませんでした…。

でも、見てみると8.2からの機能っぽい…?だったら、ドキュメントなくてもまあ…?

Remote Task Executionとは?

Infinispan Server上にタスクをデプロイして、Infinispan Server上で直接タスクを実行させる仕組みになります。

グリッドを構成する単一のNode、もしくは全Node上でタスクを実行することができます。

ドキュメント自体と、登場するAPIはそれほど多くないので、以下をさらっと見るとだいたい概要は
つかめると思います。

CREATING A REMOTE TASK

32.2. Installing Remote Tasks - Red Hat Customer Portal

32.3. Removing Remote Tasks - Red Hat Customer Portal

32.4. Running Remote Tasks - Red Hat Customer Portal

と思っていたら、バッチリハマったわけですが…。

まあ、順次見ていってみましょう。

準備

まずは、ビルド定義から。最初に、最終形を載せます。
build.sbt

name := "remote-task"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.12.1"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-unchecked", "-deprecation", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

assemblyJarName in assembly := "remote-task.jar"

assemblyMergeStrategy in assembly := {
  case "org/littlewings/infinispan/task/entity/Book.class" => MergeStrategy.discard
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

packageOptions in (Compile, packageBin) +=
    Package.ManifestAttributes("Dependencies" -> "org.littlewings.task.entity")

test in assembly := {}

fork in Test := true

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-tasks-api" % "8.2.6.Final" % Provided,
  "net.jcip" % "jcip-annotations" % "1.0" % Provided,
  "org.infinispan" % "infinispan-client-hotrod" % "8.2.6.Final" % Test,
  "org.scalatest" %% "scalatest" % "3.0.1" % Test
)

プラグイン設定。build.sbtにも設定がちょこっと登場していますが、sbt-assemblyを使用しています。
project/plugins.sbt

logLevel := Level.Warn

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")

詳細は内容を進めるにあたって説明を入れていきますが、依存関係だけ最初に説明しておきます。

少なくともRemote Task Executionを使うために必要な依存関係は、「infinispan-tasks-api」となります。

  "org.infinispan" % "infinispan-tasks-api" % "8.2.6.Final" % Provided,

スコープはprovidedでOKです。
※なお、Scalaで書いているため「jcip-annotations」が必要になっていますが、Javaで実装する際には不要です

また、タスクを呼び出すにはHot Rod Clientが必要になります。

  "org.infinispan" % "infinispan-client-hotrod" % "8.2.6.Final" % Test,

今回はテストコード中でHot Rod Clientを使用するので、スコープはtestとしてあります。テスティングフレームワーク
としては、ScalaTestとしました。

Infinispan Serverの用意

Infinispan Serverでタスクを実行すると言っている以上、Infinispan Serverが必要になります。

このエントリでは、Infinispan Serverを1 Node、そして3 Nodeでクラスタを構成するケースで書いてみたいと
思います。

3台のInfinispan Serverで動作させる時は、Infinispan Serverに対して行った管理系のコマンド(たとえばユーザーの追加、
Cacheの追加)は、各Infinispan Serverに対してそれぞれ行うこととします。

各Infinispan Serverが動作するホストの名前は、便宜上infinispan1、infinispan2、infinispan3とします。

Infinispan Serverの起動コマンドは、以下とします。

$ bin/standalone.sh -c clustered.xml -Djboss.bind.address=<ServerのIPアドレス>

Cacheの作成や、タスクのデプロイをすることになるので、Infinispan Server上にユーザーを作成しておきます。
※各Infinispan Serverに対して、コマンドを実行します

$ bin/add-user.sh -u test -p testpassword

今回は、ユーザー名「test」、パスワード「testpassword」でManagement Userを作成しました。

あと、最初に使うCacheも作成しておきましょう。Cacheの種類はDistributed Cacheとします。
※各Infinispan Serverに対して、コマンドを実行します

$ bin/ispn-cli.sh -u=test -p=testpassword -c --command="/subsystem=datagrid-infinispan/cache-container=clustered/configurations=CONFIGURATIONS/distributed-cache-configuration=simple:add(start=EAGER,mode=SYNC,owners=2)"
$ bin/ispn-cli.sh -u=test -p=testpassword -c --command="/subsystem=datagrid-infinispan/cache-container=clustered/distributed-cache=simpleCache:add(configuration=simple)"

ここで作成したCacheの名前は、「simpleCache」となります。

テストコードの雛形

続いて、テストコードの雛形を用意します。

以下のimport文があるものとし(自作のクラスも含まれていますが)、RemoteCacheを使うためのヘルパーメソッドを設けます。
src/test/scala/org/littlewings/infinispan/task/ServerTaskSpec.scala

package org.littlewings.infinispan.task

import org.infinispan.client.hotrod.configuration.ConfigurationBuilder
import org.infinispan.client.hotrod.{RemoteCache, RemoteCacheManager}
import org.littlewings.infinispan.task.entity.Book
import org.scalatest.{FunSuite, Matchers}

import scala.collection.JavaConverters._

class ServerTaskSpec extends FunSuite with Matchers {
  // ここに、テストを書く!

  def withRemoteCache[K, V](cacheName: String)(fun: RemoteCache[K, V] => Unit): Unit = {
    val manager = new RemoteCacheManager(
      new ConfigurationBuilder().addServers("172.17.0.2:11222").build()
    )

    try {
      val cache = manager.getCache[K, V](cacheName)
      fun(cache)
      cache.stop()
    } finally {
      manager.stop()
    }
  }
}

Infinispan Serverへの接続先はとりあえず最初としては「172.17.0.2」としていますが、Infinispan Server側で
クラスタが構成されていれば、Hot Rod Client側でも自動的に追加されたInfinispan Serverを認識します。

    val manager = new RemoteCacheManager(
      new ConfigurationBuilder().addServers("172.17.0.2:11222").build()
    )

ここまでで、最初の準備は完了です。

はじめてのRemote Task

それでは、タスクを作成していきます。

こちらにも書いていますが、基本的にはServerTaskインターフェースを実装したクラスを作成するだけになります。

CREATING A REMOTE TASK

ServerTaskインターフェースはCallableインターフェースを拡張したものなのですが、その他に少なくとも以下の
3つのメソッドを実装する必要があります。

  • call … タスクの処理そのもの
  • setTaskContext … Cacheやその他のリソースにアクセスするために使用
  • getName … タスク単位にユニークな名前を指定する(Hot Rod Clientからの呼び出し時に使用)

また、必要に応じて以下の2つのメソッドをオーバーライドすることができます。デフォルト値は、defaultメソッド
として実装されています。

  • getExecutionMode … タスクを実行するNodeを、TaskExecutionMode.ONE_NODE(単一Node/デフォルト)またはTaskExecutionMode.ALL_NODES(全Node)で指定
  • getAllowedRole … タスクを実行できるRoleをOptionalで指定。デフォルトはOptional.emptyで特にRole制限はない

では、最初に「単一Nodeで動作し」、「タスクを呼び出す際に受け取った引数を使って結果を返す」タスクを作ってみます。
作成したコードは、こちら。
src/main/scala/org/littlewings/infinispan/task/OneNodeSimpleTask.scala

package org.littlewings.infinispan.task

import java.net.InetAddress

import org.infinispan.tasks.{ServerTask, TaskContext}

class OneNodeSimpleTask extends ServerTask[String] {
  private[task] var taskContext: TaskContext = _

  override def getName: String = "oneNodeSimpleTask"

  override def setTaskContext(taskContext: TaskContext): Unit =
    this.taskContext = taskContext

  override def call: String = {
    val cache = taskContext.getCache
    val parameters = taskContext.getParameters

    val name =
      if (parameters.isPresent) parameters.get.get("name").asInstanceOf[String]
      else "Task"

    val cacheName =
      if (cache.isPresent) cache.get.getName
      else "nonCache"

    s"Hello ${name}!!, Cache[${cacheName}] by ${InetAddress.getLocalHost.getHostName}"
  }
}

最初なので、説明を。

まず、ServerTaskインターフェースを実装するわけですが、ServerTaskインターフェースは型パラメーターを取ります。
ここで指定した型パラメーターが、タスクの戻り値となります。

今回は、Stringとしました。

class OneNodeSimpleTask extends ServerTask[String] {

となると、callメソッドの戻り値はStringとなります。

  override def call: String = {

getNameメソッドでは、このタスクの名前を指定します。Infinispan Server上で、ユニークである必要があるみたいです。

  override def getName: String = "oneNodeSimpleTask"

で、setTaskContextメソッドでTaskContextを受け取って

  override def setTaskContext(taskContext: TaskContext): Unit =
    this.taskContext = taskContext

callメソッドで、やりたい処理を実装します。

  override def call: String = {
    val cache = taskContext.getCache
    val parameters = taskContext.getParameters

    val name =
      if (parameters.isPresent) parameters.get.get("name").asInstanceOf[String]
      else "Task"

    val cacheName =
      if (cache.isPresent) cache.get.getName
      else "nonCache"

    s"Hello ${name}!!, Cache[${cacheName}] by ${InetAddress.getLocalHost.getHostName}"
  }

TaskContextからは、呼び出しに使用されたCache、呼び出し時の引数、そして(今回は使っていませんが)Marshallerを
取得することができます。

今回は、呼び出し時の引数「name」、タスクを実行したCache名、そしてInfinispan Serverが動作しているホスト名を
返却するように実装してみました。

    s"Hello ${name}!!, Cache[${cacheName}] by ${InetAddress.getLocalHost.getHostName}"

ちなみに、TaskContextをこういうふうに受け取っていると、そもそもServerTaskのライフサイクルとの関係が
気になるところですが…

  override def setTaskContext(taskContext: TaskContext): Unit =
    this.taskContext = taskContext

どうも、ServerTaskはSingletonのようです。なので、同じタスクを同時に呼び出されたりすると困ったことになりそうなので、
場合によってはThreadLocalとか使った方がいいんでしょうかねぇ…。

ここまでで、ServerTaskの実装ができあがりました。

それでは、このタスクをJARファイルにパッケージングします。

…の前に、META-INF/servicesにService Providerの仕組みに乗せた設定を書く必要があります。
src/main/resources/META-INF/services/org.infinispan.tasks.ServerTask

org.littlewings.infinispan.task.OneNodeSimpleTask

これ、JBoss Data Gridのドキュメントにも書いていなくて、テストコードを見てやっと気づきました…。

続いて、JARファイルにパッケージングするわけですが、単一のJARファイルとしてデプロイする必要があります。
今回、依存関係にScalaが入っているのでScalaを含めた形のJARファイルを作成する必要があります。

なので、sbt-assemblyを使用しています。

できあがるJARファイルの名前は、「remote-task.jar」、テスト時にassemblyは作成しないように設定。

assemblyJarName in assembly := "remote-task.jar"

test in assembly := {}

で、JARファイル作成。

> assembly

これを、Infinispan Serverにデプロイします。
※各Infinispan Serverに対して、コマンドを実行します

$ bin/ispn-cli.sh -u=test -p=testpassword -c --command="deploy /path/to/remote-task.jar"

デプロイすると、コンソールにこんな感じのログが出力され、作成したServerTaskが認識されていることが確認できます。

2017-03-28 13:25:52,326 INFO  [org.jboss.as.clustering.infinispan] (MSC service thread 1-8) DGISPN0011: Installing ServerTask service implementation 'org.littlewings.infinispan.task.OneNodeSimpleTask'

では、テストコードで動作確認してみましょう。簡単のため、今回はInfinispan Serverの1 Nodeのみを対象とします
(サーバー名は、infinispan1)。

できあがったコードは、こちら。利用するCacheは、準備段階で作成したCacheにしました(デフォルトのものでもかまいませんが)。

  test("one-node simple-task") {
    withRemoteCache[String, String]("simpleCache") { cache =>
      val parameters = Map("name" -> "MyFirstTask").asJava
      val result = cache.execute[String]("oneNodeSimpleTask", parameters)
      result should be("Hello MyFirstTask!!, Cache[simpleCache] by infinispan1")
    }
  }

ポイントですが、タスクを呼び出す際のパラメーターは、java.util.Mapとして作成します。
※ここではScalaのMapからjava.util.Mapに変換していますが

      val parameters = Map("name" -> "MyFirstTask").asJava

あとは、RemoteCache#executeで、デプロイしたタスクの名前とともに、パラメーターを渡すとタスクを呼び出すことができます。

     val result = cache.execute[String]("oneNodeSimpleTask", parameters)

これで、初めてのRemote Taskの実装と確認が完了しました。

全Nodeでタスクを実行する

先ほどは、単一のNodeで動作するタスクを作成しましたが、今度は全Nodeでタスクを実行するように実装してみます。

ここから先は、Infinspan Serverが3 Node起動している前提とします。

作成したのは、こちら。内容的には、先ほどの単一Nodeで動作させるタスクとほぼ同じ実装です。
src/main/scala/org/littlewings/infinispan/task/AllNodeSimpleTask.scala

package org.littlewings.infinispan.task

import java.net.InetAddress

import org.infinispan.tasks.{ServerTask, TaskContext, TaskExecutionMode}

class AllNodeSimpleTask extends ServerTask[String] {
  private[task] var taskContext: TaskContext = _

  override def getName: String = "allNodeSimpleTask"

  override def setTaskContext(taskContext: TaskContext): Unit =
    this.taskContext = taskContext

  override def getExecutionMode: TaskExecutionMode = TaskExecutionMode.ALL_NODES

  override def call: String = {
    val cache = taskContext.getCache
    val parameters = taskContext.getParameters

    val name =
      if (parameters.isPresent) parameters.get.get("name").asInstanceOf[String]
      else "Task"

    val cacheName =
      if (cache.isPresent) cache.get.getName
      else "nonCache"

    s"Hello ${name}!!, Cache[${cacheName}] by ${InetAddress.getLocalHost.getHostName}"
  }
}

異なるのは、タスク名が異なるというのと

  override def getName: String = "allNodeSimpleTask"

getExecutionModeで、TaskExecutionMode.ALL_NODESを返すように実装しているということですね。

  override def getExecutionMode: TaskExecutionMode = TaskExecutionMode.ALL_NODES

で、作成したクラス名を追記します。
src/main/resources/META-INF/services/org.infinispan.tasks.ServerTask

org.littlewings.infinispan.task.OneNodeSimpleTask
org.littlewings.infinispan.task.AllNodeSimpleTask

あとはデプロイして、テストコードで確認しましょう。

補足)
すでにJARファイルをデプロイした状態でデプロイする場合は、1度アンデプロイして再度デプロイするか、「--force」オプションを使用して
強制的にデプロイする方法があります。以下は、「--force」を使った例です。

$ bin/ispn-cli.sh -u=test -p=testpassword -c --command="deploy --force /path/to/remote-task.jar"

作成したテストコードは、こちら。

  test("all-node simple-task") {
    withRemoteCache[String, String]("simpleCache") { cache =>
      val parameters = Map("name" -> "MyFirstTask").asJava
      val results = cache.execute[java.util.List[String]]("allNodeSimpleTask", parameters)
      results should (contain("Hello MyFirstTask!!, Cache[simpleCache] by infinispan1")
        and contain("Hello MyFirstTask!!, Cache[simpleCache] by infinispan2")
        and contain("Hello MyFirstTask!!, Cache[simpleCache] by infinispan3"))
    }
  }

単一Nodeの時とほぼ同じですが、全Nodeに対して呼び出しを行う場合は、呼び出し結果がListとして返却されます。

      val results = cache.execute[java.util.List[String]]("allNodeSimpleTask", parameters)

Listの要素数は、Node数と同じになります。

これで、全Nodeに対してタスクを実行することができました。

タスク内でCache内のエントリを扱う

先ほどまでは、タスク呼び出し時のパラメーターは使用していましたが、CacheについてはCacheの名前以外まったく触れていませんでした。

今度は、Cache内のエントリを扱うタスクを作成してみたいと思います。実行Nodeは、単一とします。
src/main/scala/org/littlewings/infinispan/task/CacheSimpleTask.scala

package org.littlewings.infinispan.task

import org.infinispan.Cache
import org.infinispan.tasks.{ServerTask, TaskContext}

class CacheSimpleTask extends ServerTask[String] {
  private[task] var taskContext: TaskContext = _

  override def getName: String = "cacheSimpleTask"

  override def setTaskContext(taskContext: TaskContext): Unit =
    this.taskContext = taskContext

  override def call: String = {
    val parameters = taskContext.getParameters.get
    val marshaller = taskContext.getMarshaller.get
    val cache = taskContext.getCache.get.asInstanceOf[Cache[Array[Byte], Array[Byte]]]

    val key = parameters.get("key")

    val keyAsBinary = marshaller.objectToByteBuffer(key)
    val value = marshaller.objectFromByteBuffer(cache.get(keyAsBinary))

    s"key = ${key}, value = ${value}"
  }
}

先ほどとちょっと変わったところとして、Marshallerが出てきました。Marshallerは、TaskContextから取得できます。

    val marshaller = taskContext.getMarshaller.get

というか、TaskContextから取得したMarshallerを使う必要があります。その他の、EmbeddedCacheManagerから取得できる
Marshallerを使用してはいけません。

特にCompatibility Modeを有効にしていないCacheの場合、Hot Rodで使う場合はキーも値もbyte配列が入っていることに
なっています。

    val cache = taskContext.getCache.get.asInstanceOf[Cache[Array[Byte], Array[Byte]]]

ここで、キーも値もStringと期待する場合、それぞれMarshallerを使ってbyte配列に変換、もしくはbyteから復元する必要があります。

    val keyAsBinary = marshaller.objectToByteBuffer(key)
    val value = marshaller.objectFromByteBuffer(cache.get(keyAsBinary))

キー自体は、呼び出し元からパラメーターとして教えてもらうことにします。

    val key = parameters.get("key")

これで、Cache内のエントリを使用するタスクを実装できました。

このタスクも、Service Providerの設定に追記して、デプロイします。
src/main/resources/META-INF/services/org.infinispan.tasks.ServerTask

org.littlewings.infinispan.task.OneNodeSimpleTask
org.littlewings.infinispan.task.AllNodeSimpleTask
org.littlewings.infinispan.task.CacheSimpleTask

テストコードで確認。

  test("one-node cache-task") {
    withRemoteCache[String, String]("simpleCache") { cache =>
      val parameters = Map("key" -> "simpleKey").asJava
      cache.put("simpleKey", "simpleValue")

      val result = cache.execute[String]("cacheSimpleTask", parameters)
      result should be("key = simpleKey, value = simpleValue")
    }
  }

今度は、あらかじめRemoteCache#putでデータを登録しておきます。

      cache.put("simpleKey", "simpleValue")

それ以外は、まあふつうに…。

自作のクラスを使用する

ここまでは、CacheにString…Javaに標準で含まれているクラスしかタスク内で使用しませんでしたが、次は自分で作成した
クラスを使用してみたいと思います。

具体的には、書籍(Book)をお題に、以下のようなことをやってみます。

  • 単体のNodeで実行するタスクとする
  • Bookクラスをキャッシュの値として登録(キーはISBN:Stringとする)
  • Book自体をタスクのパラメーターとする(パラメーターとして自作のクラスが使えることの確認)
  • パラメーターとなったBookのISBNを使って、Cache内のBookを取得する(Cacheに登録された自作のクラスが使えることの確認)
  • 価格を2倍したBookのインスタンスを生成し、戻り値として返却する(戻り値に自作のクラスが使えることの確認)

お題となるBookクラスのソースコードは、こちら。このあとで説明しますが、都合によりJavaで書いています。
src/main/java/org/littlewings/infinispan/task/entity/Book.java

package org.littlewings.infinispan.task.entity;

import java.io.Serializable;

public class Book implements Serializable {
    private static final long serialVersionUID = 1L;

    private String isbn;
    private String title;
    private int price;

    public Book(String isbn, String title, int price) {
        this.isbn = isbn;
        this.title = title;
        this.price = price;
    }

    // getter、setterは省略
}

ServerTaskの実装自体は、先ほどMarshallerを使ったサンプルと、そう変わらない形で実装できます。
src/main/scala/org/littlewings/infinispan/task/BookPriceDoublingTask.scala

package org.littlewings.infinispan.task

import org.infinispan.Cache
import org.infinispan.tasks.{ServerTask, TaskContext}
import org.littlewings.infinispan.task.entity.Book

class BookPriceDoublingTask extends ServerTask[Book] {
  private[task] var taskContext: TaskContext = _

  override def getName: String = "bookPriceDoublingTask"

  override def setTaskContext(taskContext: TaskContext): Unit =
    this.taskContext = taskContext

  override def call: Book = {
    val parameters = taskContext.getParameters.get
    val marshaller = taskContext.getMarshaller.get
    val cache = taskContext.getCache.get.asInstanceOf[Cache[Array[Byte], Array[Byte]]]

    // parameter
    val parameterBook = parameters.get("target").asInstanceOf[Book]

    // query
    val keyAsBinary = marshaller.objectToByteBuffer(parameterBook.getIsbn)
    val cacheBook = marshaller.objectFromByteBuffer(cache.get(keyAsBinary)).asInstanceOf[Book]

    // new
    new Book(cacheBook.getIsbn, cacheBook.getTitle, cacheBook.getPrice * 2)
  }
}

Cacheはやはりbyte配列でのキー/値となっているので

    val cache = taskContext.getCache.get.asInstanceOf[Cache[Array[Byte], Array[Byte]]]

Marshallerでオブジェクトからbyte配列、byte配列からオブジェクトへの変換をしてあげる必要があります。

    // query
    val keyAsBinary = marshaller.objectToByteBuffer(parameterBook.getIsbn)
    val cacheBook = marshaller.objectFromByteBuffer(cache.get(keyAsBinary)).asInstanceOf[Book]

Service Providerの設定にも、作成したタスクを追加します。
src/main/resources/META-INF/services/org.infinispan.tasks.ServerTask

org.littlewings.infinispan.task.OneNodeSimpleTask
org.littlewings.infinispan.task.AllNodeSimpleTask
org.littlewings.infinispan.task.CacheSimpleTask
org.littlewings.infinispan.task.BookPriceDoublingTask

あとはデプロイすればOK…と思いきや、そのようにしてタスクを実行すると、Server側でClassNotFoundExceptionを
見ることになります。

これは、MarshallerからBookクラスが見えないためです。これを回避するために、Infinispan Serverのモジュールとして
Bookクラスを追加します。

まず、Bookクラスのみを含めたJARファイルを作成します。JARファイルの名前は、「entity.jar」としましょう。

$ jar -cvf entity.jar -C classes org/littlewings/infinispan/task/entity/Book.class

このJARファイルを、Infinispan Serverの「modules/system/layers/base」ディレクトリ配下に登録します。
※ここから先のInfinispan Serverへの操作は、各Infinispan Serverに対してコマンドを実行します

まずはディレクトリ作成。「modules/system/layers/base/org/littlewings/task/entity/main」配下にJARファイルを
置くようにしましょう。

$ mkdir -p modules/system/layers/base/org/littlewings/task/entity/main

module.xmlを作成します。

$ vim modules/system/layers/base/org/littlewings/task/entity/main/module.xml

内容は、こんな感じで。

<?xml version="1.0" encoding="UTF-8"?>
<module xmlns="urn:jboss:module:1.3" name="org.littlewings.task.entity">
    <resources>
        <resource-root path="entity.jar"/>
    </resources>
</module>

続いて、「infinispan-commons」モジュールに、追加したモジュールを参照するように設定変更します。

$ vim modules/system/layers/base/org/infinispan/commons/main/module.xml

こんな感じで。JBoss Marshallerよりも前にある必要があった…はずです。

<?xml version="1.0" encoding="UTF-8"?>

<module xmlns="urn:jboss:module:1.3" name="org.infinispan.commons">
    <resources>
        <resource-root path="infinispan-commons.jar"/>
    </resources>

    <dependencies>
        <module name="javax.api"/>
        <module name="javax.transaction.api"/>
        <module name="org.littlewings.task.entity"/>
        <module name="org.jboss.logging"/>
        <module name="org.jboss.marshalling" services="import"/>
        <module name="sun.jdk"/>
    </dependencies>
</module>

ここまで実行したら、Infinispan Serverを再起動しましょう。

ここで、先ほどからデプロイしている「remote-task.jar」ファイル内に含まれているMANIFEST.MFファイルに、
以下の内容を追記する必要があります。

Dependencies: org.littlewings.task.entity

このため、sbt-assemblyの設定で、MANIFEST.MFに上記の内容が出力されるように設定を行います。

packageOptions in (Compile, packageBin) +=
    Package.ManifestAttributes("Dependencies" -> "org.littlewings.task.entity")

また、「remote-task.jar」にBookクラス自体が含まれてはいけません。同じくsbt-assemblyの設定で、「remote-task.jar」から
Bookクラスを除外するように設定します。

assemblyMergeStrategy in assembly := {
  case "org/littlewings/infinispan/task/entity/Book.class" => MergeStrategy.discard
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

あとはタスクを含んだJARファイルをデプロイすればOKですが、今回はCacheを分けましょう。

新しいDistributed Cacheをひとつ追加します。設定は、最初に作成した「simpleCache」と同じで、「bookCache」というCacheを追加。
※各Infinispan Serverに対して、コマンドを実行します

$ bin/ispn-cli.sh -u=test -p=testpassword -c --command="/subsystem=datagrid-infinispan/cache-container=clustered/distributed-cache=bookCache:add(configuration=simple)"

で、JARファイルをデプロイ後、テストコードで確認することができます。

  test("one-node include original-class") {
    withRemoteCache[String, Book]("bookCache") { cache =>
      val book = new Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4543)

      cache.put(book.getIsbn, book)

      val parameters = Map("target" -> book).asJava

      val resultBook = cache.execute[Book]("bookPriceDoublingTask", parameters)

      resultBook.getIsbn should be(book.getIsbn)
      resultBook.getTitle should be(book.getTitle)
      resultBook.getPrice should be(book.getPrice * 2)
    }
  }

Compatibility Modeを有効にした場合

先ほどのBookクラスを使用した例ですが、今度はCacheのCompatibility Modeを有効にした状態で確認してみましょう。

まずは、Compatibility Modeを有効にしたConfigurationとCacheを追加します。Cacheの名前は、「compatibilityBookCache」としました。
※設定は、Compatibility Mode以外は「simple」と同じです
※各Infinispan Serverに対して、コマンドを実行します

$ bin/ispn-cli.sh -u=test -p=testpassword -c --command="/subsystem=datagrid-infinispan/cache-container=clustered/configurations=CONFIGURATIONS/distributed-cache-configuration=compatibility:add(start=EAGER,mode=SYNC,owners=2)"
$ bin/ispn-cli.sh -u=test -p=testpassword -c --command="/subsystem=datagrid-infinispan/cache-container=clustered/configurations=CONFIGURATIONS/distributed-cache-configuration=compatibility/compatibility=COMPATIBILITY:add(enabled=true)"

で、タスクを作成します。
src/main/scala/org/littlewings/infinispan/task/CompatibilityBookPriceDoublingTask.scala

package org.littlewings.infinispan.task

import org.infinispan.Cache
import org.infinispan.tasks.{ServerTask, TaskContext}
import org.littlewings.infinispan.task.entity.Book

class CompatibilityBookPriceDoublingTask extends ServerTask[Book] {
  private[task] var taskContext: TaskContext = _

  override def getName: String = "compatibilityBookPriceDoublingTask"

  override def setTaskContext(taskContext: TaskContext): Unit =
    this.taskContext = taskContext

  override def call: Book = {
    val parameters = taskContext.getParameters.get
    val marshaller = taskContext.getMarshaller.get
    val cache = taskContext.getCache.get.asInstanceOf[Cache[String, Array[Byte]]]

    // parameter
    val parameterBook = parameters.get("target").asInstanceOf[Book]

    // query
    val isbn = parameterBook.getIsbn
    val cacheBook = marshaller.objectFromByteBuffer(cache.get(isbn)).asInstanceOf[Book]

    // new
    new Book(cacheBook.getIsbn, cacheBook.getTitle, cacheBook.getPrice * 2)
  }
}

キーに対してMarshallerは不要になるのですが、値に対しては相変わらずMarshallerが必要だったりします…。

    val cache = taskContext.getCache.get.asInstanceOf[Cache[String, Array[Byte]]]

    // parameter
    val parameterBook = parameters.get("target").asInstanceOf[Book]

    // query
    val isbn = parameterBook.getIsbn
    val cacheBook = marshaller.objectFromByteBuffer(cache.get(isbn)).asInstanceOf[Book]

このあたりの挙動、実はHot Rod越しのRemote Scriptingと同じだったりします…。
http://d.hatena.ne.jp/Kazuhira/20151223/1450884828

うーん…。

こちらも、Service Providerの設定に追加してデプロイ。
src/main/resources/META-INF/services/org.infinispan.tasks.ServerTask

org.littlewings.infinispan.task.OneNodeSimpleTask
org.littlewings.infinispan.task.AllNodeSimpleTask
org.littlewings.infinispan.task.CacheSimpleTask
org.littlewings.infinispan.task.BookPriceDoublingTask
org.littlewings.infinispan.task.CompatibilityBookPriceDoublingTask

テストコードは、そう変わりません。

  test("one-node include original-class, with Compatibility mode") {
    withRemoteCache[String, Book]("compatibilityBookCache") { cache =>
      val book = new Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4543)

      cache.put(book.getIsbn, book)

      val parameters = Map("target" -> book).asJava

      val resultBook = cache.execute[Book]("compatibilityBookPriceDoublingTask", parameters)

      resultBook.getIsbn should be(book.getIsbn)
      resultBook.getTitle should be(book.getTitle)
      resultBook.getPrice should be(book.getPrice * 2)
    }
  }

タスク内で、Distributed Stream APIを使う

最後に、タスク内でDistributed Stream APIを使ってみます。これを使うには、Compatibility Modeを有効にした
Cacheを使うのが無難な気がします…。

Distributed Cacheに対してDistributed Stream APIを使うので、タスクの実行Nodeは単体としましょう。

お題は、先ほど作成したBookのインスタンスを複数登録して、価格の合計を算出するタスクを作ってみます。
というわけで、Bookクラスを先ほどと同様、Infinispan Serverにモジュールとして登録していることを
前提とします。

で、作成したのがこちら。
src/main/scala/org/littlewings/infinispan/task/BookPriceSumTask.scala
package org.littlewings.infinispan.tas

import java.util.stream.{Collector, Collectors}

import org.infinispan.Cache
import org.infinispan.stream.{CacheCollectors, SerializableSupplier}
import org.infinispan.tasks.{ServerTask, TaskContext}
import org.littlewings.infinispan.task.entity.Book

class BookPriceSumTask extends ServerTask[Int] {
  private[task] var taskContext: TaskContext = _

  override def getName: String = "bookPriceSumTask"

  override def setTaskContext(taskContext: TaskContext): Unit =
    this.taskContext = taskContext

  override def call: Int = {
    val cache = taskContext.getCache.get.asInstanceOf[Cache[String, Book]]

    val stream = cache.entrySet.stream

    try {
      stream.map[Int](new java.util.function.Function[java.util.Map.Entry[String, Book], Int] with Serializable {
        override def apply(entity: java.util.Map.Entry[String, Book]): Int = entity.getValue.getPrice
      }).collect(CacheCollectors.serializableCollector[Int, Integer](new SerializableSupplier[Collector[Int, _, Integer]] {
        override def get: Collector[Int, _, Integer] = Collectors.summingInt[Int](i => Integer.valueOf(i))
      }))
    } finally {
      stream.close()
    }
  }
}

なんかですね、Distributed Stream APIを使った時は、Marshallerが不要みたいなんですけど、なんででしょう…。
かといって、MarshallerはSerializableではないですし、ここで渡しているFunctionやCollectorはSerializableである
必要があるので、もしも必要って言われるとそれはそれで困るのですが…。

型の記載がいろいろ面倒になっているのは、ScalaJavaのStream APIの相性の悪さが効いています…。

あと、この実装ならStream#mapToIntとかにすればいいんじゃ?と言われそうですが、やったらサポートしてないよって
言われました…。

java.lang.UnsupportedOperationException: Primitive delegate is not yet supported!

で、このタスクもService Providerの設定に追加して、デプロイ。
src/main/resources/META-INF/services/org.infinispan.tasks.ServerTask

org.littlewings.infinispan.task.OneNodeSimpleTask
org.littlewings.infinispan.task.AllNodeSimpleTask
org.littlewings.infinispan.task.CacheSimpleTask
org.littlewings.infinispan.task.BookPriceDoublingTask
org.littlewings.infinispan.task.CompatibilityBookPriceDoublingTask
org.littlewings.infinispan.task.BookPriceSumTask

テストコードは、こちら。

  test("one-node include original-class using Distributed Stream API") {
    withRemoteCache[String, Book]("compatibilityBookCache") { cache =>
      val books = Array(
        new Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4543),
        new Book("978-1785285332", "Getting Started With Hazelcast", 3533),
        new Book("978-1849519205", "Hibernate Search by Example", 3365),
        new Book("978-1783988181", "Mastering Redis", 6102),
        new Book("978-1491933664", "Cassandra: The Definitive Guide", 4637),
        new Book("978-1449344689", "MongoDB: The Definitive Guide", 3925),
        new Book("978-1449358549", "Elasticsearch: The Definitive Guide", 5951),
        new Book("978-1784399641", "Apache Solr Essentials", 3301),
        new Book("978-1449396107", "HBase: The Definitive Guide", 4680)
      )

      books.foreach(b => cache.put(b.getIsbn, b))

      val result = cache.execute[Integer]("bookPriceSumTask", new java.util.HashMap[String, AnyRef])
      result should be(40037)
    }
  }

タスクさえデプロイできていれば、あとは動かせるでしょう。

とりあえず、試してみたいことはざっと確認できました…。

まとめ

Infinispan Server上で任意の処理を動かす、Remote Task Executionを試してみました。

存在をちゃんと認識していませんでしたが、知っておくと観点としてはありなのかなぁとちょっと
思いました。

ここまで通すのに、だいぶハマりましたけど…。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-task