CLOVER🍀

That was when it all began.

InfinispanでRemote Scripting(JSR-223)

以前、Embedded ModeなCacheでScripting(JSR-223) in serverなる機能で遊んでみました。Infinispan上で、スクリプトを実行する機能です。

InfinispanのScripting機能(JSR-223)を試す - CLOVER

これの、Hot Rod版を試してみたいと思います。以前うまくいかないところがあってやめていたのですが、チュートリアルやドキュメントもできたようですので。

Remote Scripting Tutorial - Infinispan

Scripting

Hot Rodで実行するということは、サーバー側でスクリプトが動作して、結果がクライアントに返却されるという形になります。なんかストアドっぽいですね。
デフォルトはJavaScriptが動作するので、Java 8の場合Nashorn上で動作することになります。今のInfinispanはJava 8前提ですが。

なお、ソースを追っていて気付いたのですが、以前Embedded Mode試した時からもインターフェースがけっこう変わったんだなぁ、と。

まあ、今回はRemoteCacheで試していきましょう。

準備

とりあえず、Infinispan Serverがないと始まりません。ダウンロードして展開して、起動します。

$ infinispan-server-8.1.0.Final/bin/standalone.sh -c clustered.xml

クラスタ構成で起動。

が、後々のため、Compatibility Modeを有効にしたCacheも作ってみます。

$ infinispan-server-8.1.0.Final/bin/ispn-cli.sh -c
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/configurations=CONFIGURATIONS/distributed-cache-configuration=compatibilityCache:add(start=EAGER,mode=SYNC)
{"outcome" => "success"}
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/configurations=CONFIGURATIONS/distributed-cache-configuration=compatibilityCache/compatibility=COMPATIBILITY:add(enabled=true)
{"outcome" => "success"}
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/distributed-cache=compatibilityCache:add(configuration=compatibilityCache)
{"outcome" => "success"}

Distributed Cacheとします。

で、再起動。

また、最初のNodeとは別に、もうひとつNodeを起動しておきます。

$ infinispan-server-8.1.0.Final/bin/standalone.sh -c clustered.xml -Djboss.socket.binding.port-offset=1000

クラスタ構成です。

ここまでで、Infinispan Serverの準備はおしまい。あとはプログラムです。

今回は、Scalaで書きます。sbtの設定は、以下の通り。
build.sbt

name := "remote-scripting"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.7"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

parallelExecution in Test := false

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-client-hotrod" % "8.1.0.Final",
  "net.jcip" % "jcip-annotations" % "1.0" % "provided",
  "org.scalatest" %% "scalatest" % "2.2.5" % "test"
)

とりあえず、Hot Rod Clientがあればいいようです。

テストコードの雛形

コードは、ScalaTestを使ったテストコードで書きます。

src/test/scala/org/littlewings/infinispan/scripting/ScriptingSpec.scala

package org.littlewings.infinispan.scripting

import java.util.Collections

import org.infinispan.client.hotrod.{RemoteCacheManager, RemoteCache}
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder
import org.scalatest.{FunSpec, Matchers}

import scala.io.Source

class ScriptingSpec extends FunSpec with Matchers {
  describe("Scripting Spec") {
    // ここに、テストを書く!
  }

  protected def readScript(path: String): String = {
    val is = Thread.currentThread.getContextClassLoader.getResourceAsStream(path)

    val source = Source.fromInputStream(is, "UTF-8")
    try {
      source.mkString
    } finally {
      source.close()
    }
  }

  protected def withRemoteCache[K, V](cacheName: String)(fun: RemoteCache[K, V] => Unit): Unit = {
    val manager =
      new RemoteCacheManager(new ConfigurationBuilder().addServer().host("localhost").port(11222).build())

    try {
      val cache = manager.getCache[K, V](cacheName)
      fun(cache)
      cache.clear()
      cache.stop()
    } finally {
      manager.stop()
    }
  }
}

RemoteCacheを使うためのメソッドと、クラスパス上からファイルを読むためのメソッドを用意。こちらを使って、プログラムを書いていきます。

最初のRemote Scripting

チュートリアルとドキュメントを見ながら、まずは短いスクリプトを書いて実行してみます。

Remote Scripting Tutorial - Infinispan

nstalling scripts

書いたのは、例に習ってこんな感じ。

    it("simple multiply scripting, using literal") {
      withRemoteCache[Int, Int]("namedCache") { cache =>
        val manager = cache.getRemoteCacheManager
        val scriptCache = manager.getCache[String, String]("___script_cache")

        scriptCache.put("simple.js", "a * b")

        val params = new java.util.HashMap[String, Int]
        params.put("a", 5)
        params.put("b", 6)

        val result = cache.execute[Double]("simple.js", params)

        result should be(30)
      }
    }

メインとなるCacheは、まずは最初から定義されている「namedCache」を使ってみます。

      withRemoteCache[Int, Int]("namedCache") { cache =>

RemoteCacheManagerから、「___script_cache」という名前のCacheを取得します。

        val manager = cache.getRemoteCacheManager
        val scriptCache = manager.getCache[String, String]("___script_cache")

これ、Embedded Modeだと定数が使えるんですけどねぇ…。そのためにinfinispan-scriptingを引き込んでもいいのですが…。

で、このRemoteCacheに、「simple.js」という名前でJavaScriptのコードを文字列として登録します。

        scriptCache.put("simple.js", "a * b")

このaとbという変数に対応する値をMapに登録して

        val params = new java.util.HashMap[String, Int]
        params.put("a", 5)
        params.put("b", 6)

実行。

        val result = cache.execute[Double]("simple.js", params)

        result should be(30)

掛け算しているので、30という結果が得られました。JavaScriptが実行できたようです。

ちょっとファイルとして作ってみる

今度は、あらかじめ用意していたクラスパス上からファイルを読むメソッドを使って、スクリプトをRemoteCacheに登録して実行してみます。

用意したJavaScriptはこちら。
src/test/resources/scripts/noBindings.js

var str1 = "Hello";
var str2 = "Scripting";

str1 + " "  + str2 + "!!";

実行側のコードは、こんな感じ。

    it("using script, defined variables") {
      withRemoteCache[Int, Int]("namedCache") { cache =>
        val manager = cache.getRemoteCacheManager
        val scriptCache = manager.getCache[String, String]("___script_cache")

        scriptCache.put("noBindings.js", readScript("scripts/noBindings.js"))

        val result = cache.execute[String]("noBindings.js", Collections.emptyMap[String, String])

        result should be("Hello Scripting!!")
      }
    }

特に与えるパラメータはないので、RemoteCache#executeの第2引数はemptyMapにしています。

結果としては、文字列連結した結果が得られましたね。

RemoteCacheに登録されたエントリを使ってみる

今度は、スクリプトの中でRemoteCacheに登録したエントリを使ってみます。

用意したスクリプトは、こちら。
src/test/resources/scripts/bindings.js

var cachedValue3 = cache.get(marshaller.objectToByteBuffer("key3"));
var cachedValue5 = cache.get(marshaller.objectToByteBuffer("key5"));

var value1 = marshaller.objectFromByteBuffer(cachedValue3) * a;
var value2 = marshaller.objectFromByteBuffer(cachedValue5) * b;
java.lang.Double.valueOf(value1 + value2).intValue();

a、bはRemoteCache#executeの第2引数として与えるものですが、cacheとmarshallerというものが現れています。

スクリプト内で、暗黙的に使える4つの変数があって(ちょっと嘘入っています)、それが載っているのがこちらのドキュメントです。

Script bindings

定義箇所としては、こちら。
https://github.com/infinispan/infinispan/blob/8.1.0.Final/scripting/src/main/java/org/infinispan/scripting/impl/SystemBindings.java#L10

これらを使ったコードは、こちらになります。

    it("using script, pass variables") {
      withRemoteCache[String, Int]("namedCache") { cache =>
        val manager = cache.getRemoteCacheManager
        val scriptCache = manager.getCache[String, String]("___script_cache")

        scriptCache.put("bindings.js", readScript("scripts/bindings.js"))

        (1 to 10).foreach(i => cache.put(s"$key$i", i))

        val params = new java.util.HashMap[String, Int]
        params.put("a", 5)
        params.put("b", 8)

        val result = cache.execute[Int]("bindings.js", params)

        result should be(55)
      }
    }

ちゃんとRemoteCacheのエントリと、引数を使いつつ演算できました、と。

ここで紹介された4つの変数ですが、それぞれ以下になります。

名前 意味
cache スクリプトを実行しているCache(EmbededなCache)
marshaller Cacheのデータをmarshall/unmarshallするために必要なMarshaller(org.infinispan.commons.marshall.jboss.GenericJBossMarshaller)
cacheManager CacheのオーナーとなるCacheManager(EmbeddedCacheManager)
scriptingManager スクリプトを実行しているScriptManager

なお、この4つが全部使えるのは、Local Cacheのみです。Distributed Cacheだと、cacheのみになります…。これが後々、えらいことに…。

Local Modeと宣言してスクリプトを書く

ところで、スクリプトにはコメントとしてメタデータを埋め込むことができます。

Script metadata

それぞれ、以下の意味になります。

名前 意味
mode スクリプトの実行モード(localまたはdistributed、デフォルトはlocal)
language スクリプトの言語(デフォルトはjavascript
extension スクリプトを実行する際の言語を指定するもうひとつの方法で、拡張子で指定(デフォルトはjs)
role スクリプトを実行するためのロール

modeはそれ以外にもmapperやreducerがあったりしますが、これらはInfinispan 9.0で削除される予定(Distributed Streamsに置き換えられる)みたいなので、端折ります。

modeの定義は、このあたりに書いてあります。
https://github.com/infinispan/infinispan/blob/8.1.0.Final/scripting/src/main/java/org/infinispan/scripting/impl/ExecutionMode.java#L9

メタデータをパースしている箇所は、このあたりになります。
https://github.com/infinispan/infinispan/blob/8.1.0.Final/scripting/src/main/java/org/infinispan/scripting/impl/ScriptMetadataParser.java#L21

つまり、先ほどのスクリプトメタデータ入りで書くと、このようになります。
src/test/resources/scripts/localBindings.js

// mode=local,language=javascript
var cachedValue3 = cache.get(marshaller.objectToByteBuffer("key3"));
var cachedValue5 = cache.get(marshaller.objectToByteBuffer("key5"));

var value1 = marshaller.objectFromByteBuffer(cachedValue3) * a;
var value2 = marshaller.objectFromByteBuffer(cachedValue5) * b;
java.lang.Double.valueOf(value1 + value2).intValue();

実行コード。

    it("using script, pass variables as local") {
      withRemoteCache[String, Int]("namedCache") { cache =>
        val manager = cache.getRemoteCacheManager
        val scriptCache = manager.getCache[String, String]("___script_cache")

        scriptCache.put("localBindings.js", readScript("scripts/localBindings.js"))

        (1 to 10).foreach(i => cache.put(s"$key$i", i))

        val params = new java.util.HashMap[String, Int]
        params.put("a", 5)
        params.put("b", 8)

        val result = cache.execute[Int]("localBindings.js", params)

        result should be(55)
      }
    }

Compatibility Modeと合わせてみる

ここまで、スクリプト内でキーや値を取得する際に、marshallerがつきまとっていましたが、これを減らせないかな?という意味でCompatibility Modeを有効にしたCacheを使って試してみました。

先に実行コードから。

    it("using script in compatibility mode, pass variables as local") {
      withRemoteCache[String, Int]("compatibilityCache") { cache =>
        val manager = cache.getRemoteCacheManager
        val scriptCache = manager.getCache[String, String]("___script_cache")

        scriptCache.put("localCompatibilityBindings.js", readScript("scripts/localCompatibilityBindings.js"))

        (1 to 10).foreach(i => cache.put(s"$key$i", i))

        val params = new java.util.HashMap[String, Int]
        params.put("a", 5)
        params.put("b", 8)

        val result = cache.execute[Int]("localCompatibilityBindings.js", params)

        result should be(55)
      }
    }

使うRemoteCacheが変わっただけで、先ほどと同じです。

      withRemoteCache[String, Int]("compatibilityCache") { cache =>

で、スクリプトの方は…
src/test/resources/scripts/localCompatibilityBindings.js

// mode=local,language=javascript
var cachedValue3 = cache.get("key3");
var cachedValue5 = cache.get("key5");

var value1 = marshaller.objectFromByteBuffer(cachedValue3) * a;
var value2 = marshaller.objectFromByteBuffer(cachedValue5) * b;
java.lang.Double.valueOf(value1 + value2).intValue();

なんか、キーだけmarshallerが要らなくなりましたが、値の方はなぜか必要でした。これは…。

まあ、いいか…。

Distributed Modeで実行してみる

今度は、クラスタを構成していることですし、Distributed Modeで実行してみます。

いやー、ハマりました。

実行コードは単純なので、先に載せておきます。

    it("using script, as distributed") {
      withRemoteCache[String, Int]("namedCache") { cache =>
        val manager = cache.getRemoteCacheManager
        val scriptCache = manager.getCache[String, String]("___script_cache")

        scriptCache.put("distributedBindings.js", readScript("scripts/distributedBindings.js"))

        (1 to 10).foreach(i => cache.put(s"$key$i", i))

        // distributedの場合は、パラメーターを渡しても無視される…
        val result = cache.execute[java.util.List[Int]]("distributedBindings.js", Collections.emptyMap[String, String])

        result should contain theSameElementsAs List(8, 8)
      }
    }

なんかコメントで書いていますが、パラメーターを渡してもこの実行形態の場合、無視されてしまいます。

そして、スクリプト側。
src/test/resources/scripts/distributedBindings.js

// mode=distributed,language=javascript
var scriptCache = cache.getCacheManager().getCache("___script_cache");
var marshaller = scriptCache.getCacheConfiguration().compatibility().marshaller();

var cachedValue3 = cache.get(marshaller.objectToByteBuffer("key3"));
var cachedValue5 = cache.get(marshaller.objectToByteBuffer("key5"));

var value1 = marshaller.objectFromByteBuffer(cachedValue3);
var value2 = marshaller.objectFromByteBuffer(cachedValue5);
java.lang.Double.valueOf(value1 + value2).intValue();

メタデータとして、distributedなことを宣言します。

// mode=distributed,language=javascript

それはいいのですが、なぜかMarshallerをScriptCacheから引っこ抜いています…。

var scriptCache = cache.getCacheManager().getCache("___script_cache");
var marshaller = scriptCache.getCacheConfiguration().compatibility().marshaller();

いや、他にうまい方法がなくて…良くない感じですが…。

とはいえ、marshallerは必要なわけでして。

で、先ほどもちらっと書きましたが、Distributed Modeの場合、使える変数はcacheのみのようです。

これは、以下あたりの実装を見ているとなんとなくわかります。

まず、この仕組みの正体は、DistributedExecutorServiceです。
https://github.com/infinispan/infinispan/blob/8.1.0.Final/scripting/src/main/java/org/infinispan/scripting/impl/DistributedRunner.java#L35

そこで呼び出されるCallableとして、スクリプトは実行されます。
https://github.com/infinispan/infinispan/blob/8.1.0.Final/scripting/src/main/java/org/infinispan/scripting/impl/DistributedScript.java#L19

あ、実行場所は、submitEverywhereなので全Nodeですね。
https://github.com/infinispan/infinispan/blob/8.1.0.Final/scripting/src/main/java/org/infinispan/scripting/impl/DistributedRunner.java#L37

このため、Callableに渡すパラメーターはSerializableである必要がありますが、スクリプトの引数となるCacheScriptBindingsはSerializableではありません。
https://github.com/infinispan/infinispan/blob/8.1.0.Final/scripting/src/main/java/org/infinispan/scripting/impl/CacheScriptBindings.java

まあ、JSR-223のBindings系もSerializableではないですからねぇ、RemoteCache#executeの引数が渡せないのは、これが理由なのでしょう…。

DistributedScriptでも、バッチリtransientて書いてますからね!
https://github.com/infinispan/infinispan/blob/8.1.0.Final/scripting/src/main/java/org/infinispan/scripting/impl/DistributedScript.java#L22

その他、cache以外が使えない理由ですが、同じくこのDistributedScriptにあります。スクリプトに渡す引数として、cacheとinputKeysしか持っていないのです。
https://github.com/infinispan/infinispan/blob/8.1.0.Final/scripting/src/main/java/org/infinispan/scripting/impl/DistributedScript.java#L38

submitEverywhereの追加引数はないので、inputKeysも空となり、実質cacheのみです。

とはいえ、marshallerはないと困ります。Local Modeの場合は、ScriptingManagerの実装が設定してくれています。
https://github.com/infinispan/infinispan/blob/8.1.0.Final/scripting/src/main/java/org/infinispan/scripting/impl/ScriptingManagerImpl.java#L158

その大元は、Hot Rod Serverです。
https://github.com/infinispan/infinispan/blob/8.1.0.Final/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder2x.scala#L322

Distributed Modeは先の理由でmarshallerが取得できないので、仕方なくScriptCacheから引き抜くことにしました。
https://github.com/infinispan/infinispan/blob/8.1.0.Final/scripting/src/main/java/org/infinispan/scripting/impl/ScriptingManagerImpl.java#L83

それが、こんなことが書かれていた理由です。

var scriptCache = cache.getCacheManager().getCache("___script_cache");
var marshaller = scriptCache.getCacheConfiguration().compatibility().marshaller();

こんなこと書くくらいなら、Marshallerの正体はGenericJBossMarshallerって分かっているので

var marshaller = new org.infinispan.commons.marshall.jboss.GenericJBossMarshaller();

とすればよいかなと思ったのですが、クラスタ構成の場合は追加NodeでClassNotFoundExceptionになってしまい、うまくいきませんでした…。

これでなんとか動くとはいえ、かなり微妙な方法ですし、それも使わなかったらCacheの中身も使えないことに…。

Distributed ModeでCompatibitility Mode

最後に、ものは試しとDistributed ModeかつComatibility ModeなCacheで試してみました。

実行コードはこちら。

    it("using script in compatibility mode, as distributed") {
      withRemoteCache[String, Int]("compatibilityCache") { cache =>
        val manager = cache.getRemoteCacheManager
        val scriptCache = manager.getCache[String, String]("___script_cache")

        scriptCache.put("distributedCompatibilityBindings.js", readScript("scripts/distributedCompatibilityBindings.js"))

        (1 to 10).foreach(i => cache.put(s"$key$i", i))

        val result = cache.execute[java.util.List[Int]]("distributedCompatibilityBindings.js", Collections.emptyMap[String, String])

        result should contain theSameElementsAs List(8, 8)
      }
    }

対するスクリプトは、こちら。
src/test/resources/scripts/distributedCompatibilityBindings.js

// mode=distributed,language=javascript
var scriptCache = cache.getCacheManager().getCache("___script_cache");
var marshaller = scriptCache.getCacheConfiguration().compatibility().marshaller();

var cachedValue3 = cache.get(marshaller.objectToByteBuffer("key3"));
var value1;
if (cachedValue3) {
    value1 = marshaller.objectFromByteBuffer(cachedValue3);
} else {
    cachedValue3 = cache.get("key3");
    value1 = cachedValue3;
}

var cachedValue5 = cache.get(marshaller.objectToByteBuffer("key5"));
var value2;
if (cachedValue5) {
    value2 = marshaller.objectFromByteBuffer(cachedValue5);
} else {
    cachedValue5 = cache.get("key5");
    value2 = cachedValue5;
}

java.lang.Double.valueOf(value1 + value2).intValue();

なんか、微妙なことになりました…。

Nodeによって、Marshallerの処理が必要だったり不要だったりしてですね…。なんか変な結果になりまして…。

Local Mode&Compatibility Modeな時もあれって感じだったので、ここはいいかなぁと。

まとめ

Infinispanで、Remote Scriptingを試してみました。

シリアライズの関係だったり、既存のDistributedExecutorServiceを使ったりの関係からか、特にDistributed Modeの時はおやって感じになることが多いかもです。

どうなのでしょう、単純にスクリプトを各Nodeで実行できる基盤と捉えた方がよいのでしょうか。引数渡せなかったり、Marshallerが使えなかったりすると、全Nodeで同じ仕事をすることになりそうな…。

あと、これを使う時にCompatibility Modeを有効にするのはやめましょう、と。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-scripting