CLOVER🍀

That was when it all began.

Infinispan Serverの、Hot RodとRESTのEndpointをコンパチにする

Infinispan 5.3からの機能なのですが…以前ちょっと諦めていた、InfinispanでEmbedded Cacheや各Remote Endpoint(Hot Rod、REST、Memcached)をコンパチにする機能を試してみました。

33. Interoperability between Embedded and Remote Server Endpoints
http://infinispan.org/docs/7.2.x/user_guide/user_guide.html#_interoperability_between_embedded_and_remote_server_endpoints

Compatibility Modeといいます。これを使用すると、通常一緒には使えないEmbedded ModeのCacheとHot RodのCache、Hot RodのCacheとRESTのCacheを相互運用的に?使えるのだとか。

Hot RodのCacheにエントリを入れて、同じ名前のRESTのCacheからエントリを取り出すみたいな感じですね。

正直なところ、まだEmbedded CacheとRemote Cacheは挫折しそうな感じだったので、まずはInfinispan ServerでHot RodとRESTをコンパチにしてみました。

このエントリを書くにあたって、このあたりの情報を参考にしています。

https://github.com/infinispan/infinispan/tree/master/integrationtests/compatibility-mode-it/src/test/java/org/infinispan/it/compatibility
https://github.com/jboss-developer/jboss-jdg-quickstarts/blob/master/rapid-stock-market/README.md

まあ、テストコードとJDGのサンプルなのですが。

準備

用意としては、アプリケーションのコードとInfinispan Serverが必要です。

まずはビルド定義。
build.sbt

name := "remote-hotrod-rest-compatibility"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.6"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

fork in Test := true

parallelExecution in Test := false

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-client-hotrod" % "7.2.1.Final",
  "net.jcip" % "jcip-annotations" % "1.0" % "provided",
  "org.jboss.resteasy" % "resteasy-client" % "3.0.11.Final",
  "org.scalatest" %% "scalatest" % "2.2.5" % "test"
)

RESTのクライアントは、JAX-RS Client APIとしましょう。

続いて、Infinispan Serverをダウンロードしてきて

http://infinispan.org/download/

適当な場所に展開、起動します。

$ unzip infinispan-server-7.2.1.Final-bin.zip

起動。

$ infinispan-server-7.2.1.Final/bin/standalone.sh

Compatibility Modeを有効にしたCacheを定義する

ドキュメントに沿うと、異なる種類のCacheを相互運用するには、Compatibility Modeの設定をする必要があるようです。

デフォルトのCacheの定義は、こうなので
infinispan-server-7.2.1.Final/standalone/configuration/standalone.xmlより抜粋

            <cache-container name="local" default-cache="default" statistics="true">
                <local-cache name="default" start="EAGER">
                    <locking acquire-timeout="30000" concurrency-level="1000" striping="false"/>
                    <transaction mode="NONE"/>
                </local-cache>
                <local-cache name="memcachedCache" start="EAGER">
                    <locking acquire-timeout="30000" concurrency-level="1000" striping="false"/>
                    <transaction mode="NONE"/>
                </local-cache>
                <local-cache name="namedCache" start="EAGER"/>
            </cache-container>

ここにCacheを追加することにします。

管理CLIを起動。

$ infinispan-server-7.2.1.Final/bin/jboss-cli.sh -c

Compatibility Modeを有効にしたCacheを追加して

[standalone@localhost:9990 /] /subsystem=infinispan/cache-container=local/local-cache=compatibilityCache:add(start=EAGER)
{"outcome" => "success"}
[standalone@localhost:9990 /] /subsystem=infinispan/cache-container=local/local-cache=compatibilityCache/compatibility=COMPATIBILITY:add(enabled=true)
{"outcome" => "success"}

reload。

[standalone@localhost:9990 /] reload

Compatibility Modeを有効にしたCacheの名前は、「compatibilityCache」としました。

ここまでで、最初の準備としてはOKです。

コンパチなCacheを使ってみる

それでは、Hot RodとRESTのCacheを相互に使ってみます。

まずは、テストコードの雛形を用意。
src/test/scala/org/littlewings/infinispan/compatibility/HotRodRestCompatibilitySpec.scala

package org.littlewings.infinispan.compatibility

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
import javax.ws.rs.client.{ClientBuilder, Entity}
import javax.ws.rs.core.Response

import org.infinispan.client.hotrod.RemoteCacheManager
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder
import org.infinispan.client.hotrod.exceptions.HotRodClientException
import org.scalatest.FunSpec
import org.scalatest.Matchers._

class HotRodRestCompatibilitySpec extends FunSpec {
  describe("Hot Rod & REST Compatibility Spec") {
    // ここに、テストを書く
  }
}

比較のために、デフォルトで用意されている、特に何も設定されていない「namedCache」も利用してみます。

Compatibility Modeが無効なCacheを使う

というわけで、「namedCache」から。Hot Rod、REST双方にアクセスできる手段を用意。

    it("simple cache, compatibility disabled") {
      val manager = new RemoteCacheManager(new ConfigurationBuilder().addServers("localhost:11222").build)
      val cache = manager.getCache[String, Any]("namedCache")

      val client = ClientBuilder.newBuilder.build

      try {
        // ここで、Cacheを利用する
      } finally {
        cache.clear()
        cache.stop()
        manager.stop()

        client.close()
      }

Hot Rod Clientでエントリを登録してみます。

        cache.put("key1", "value1")
        cache.get("key1") should be("value1")

これを、REST経由で取得すると…

        val getResponse =
          client
            .target("http://localhost:8080/rest/namedCache/key1")
            .request
            .get

        getResponse.getStatus should be(Response.Status.NOT_FOUND.getStatusCode)
        getResponse.readEntity(classOf[String]) should be("")
        getResponse.close()

NOT FOUNDですね。

反対に、RESTでエントリを登録して

        val putResponse =
          client
            .target("http://localhost:8080/rest/namedCache/key2")
            .request
            .put(Entity.text("value2"))

Hot Rodで見ると

        putResponse.getStatus should be(Response.Status.OK.getStatusCode)
        putResponse.close()

        cache.get("key2").asInstanceOf[String] should be(null)

こちらからも、エントリが取得できないようです。

Compatibility Modeを有効にしたCacheを使う

それでは、今度はCompatibility Modeを有効にした「compatibilityCache」を使ってみます。

    it("simple cache, compatibility enabled") {
      val manager = new RemoteCacheManager(new ConfigurationBuilder().addServers("localhost:11222").build)
      val cache = manager.getCache[String, Any]("compatibilityCache")

      val client = ClientBuilder.newBuilder.build

      try {
        // ここで、Cacheを利用する
      } finally {
        cache.clear()
        cache.stop()
        manager.stop()

        client.close()
      }
    }

先ほどのコードテンプレートから、Cacheの名前が変わっただけですね。

Hot Rodでエントリを登録。

        cache.put("key1", "value1")
        cache.get("key1") should be("value1")

これをRESTで取得すると…

        val getResponse =
          client
            .target("http://localhost:8080/rest/compatibilityCache/key1")
            .request
            .get

        getResponse.getStatus should be(Response.Status.OK.getStatusCode)
        getResponse.readEntity(classOf[String]) should be("value1")
        getResponse.close()

無事に取れました!

続いて、RESTで登録してHot Rodで取得してみます。

        val putResponse =
          client
            .target("http://localhost:8080/rest/compatibilityCache/key2")
            .request
            .put(Entity.text("value2"))

        putResponse.getStatus should be(Response.Status.OK.getStatusCode)
        putResponse.close()

        new String(cache.get("key2").asInstanceOf[Array[Byte]]) should be("value2")

こちらもOKでした!…けど、よく見るとHot Rodの方はbyteの配列で取得できたことになっています。

というか、RESTで使う時には、内部では値がbyte配列ってことになっているからですね。

https://github.com/infinispan/infinispan/blob/7.2.1.Final/server/rest/src/main/scala/org/infinispan/rest/RestCacheManager.scala#L20

今回用意した例で、Cacheの型パラメーターがAnyになっているのは、これが理由です。

      val cache = manager.getCache[String, Any]("compatibilityCache")

とはいえ、このままだとCacheエントリを登録するEndpointで型が変わってしまうので、厳しいです。

そこで、リクエストするMime-Typeを「application/x-java-serialized-object」に変更します。
※application/x-java-serialized-object、知りませんでした…
https://github.com/infinispan/infinispan/blob/7.2.1.Final/integrationtests/compatibility-mode-it/src/test/java/org/infinispan/it/compatibility/EmbeddedRestHotRodTest.java#L128

参考は、テストコードでした。要は、Javaオブジェクトをシリアライズして送受信したいと。

それでは、Mime-Typeを変えて再度テスト。ここからは、テストコードはケース単位に載せていきます。

    it("simple cache, compatibility enabled, as Java Serialized Object") {
      val manager = new RemoteCacheManager(new ConfigurationBuilder().addServers("localhost:11222").build)
      val cache = manager.getCache[String, String]("compatibilityCache")

      val client = ClientBuilder.newBuilder.build

      try {
        cache.put("key1", "value1")
        cache.get("key1") should be("value1")

        val getResponse =
          client
            .target("http://localhost:8080/rest/compatibilityCache/key1")
            .request("application/x-java-serialized-object")
            .get

        getResponse.getStatus should be(Response.Status.OK.getStatusCode)
        getResponse.readEntity(classOf[String]) should be("value1")
        getResponse.close()

        val putResponse =
          client
            .target("http://localhost:8080/rest/compatibilityCache/key2")
            .request("application/x-java-serialized-object")
            .put(Entity.entity(javaSerialize("value2"), "application/x-java-serialized-object"))

        putResponse.getStatus should be(Response.Status.OK.getStatusCode)
        putResponse.close()

        cache.get("key2") should be("value2")
      } finally {
        cache.clear()
        cache.stop()
        manager.stop()

        client.close()
      }
    }

今度はOKでした!

これで、Hot RodとRESTのどちらからCacheエントリを登録しても、同じ型で取り出せます。これなら相互に使えますね!

ユーザー定義の型を使う

先ほどの例は、Java標準のStringクラスを使用してCacheへの登録/参照を行いました。

今度は、自分でシリアライズ可能なクラスを定義して、それをCacheに登録してやりとりしてみましょう。この後の都合上、ここだけはソースコードJavaで書きました。
src/main/java/org/littlewings/infinispan/compatibility/Person.java

package org.littlewings.infinispan.compatibility;

import java.io.Serializable;
import java.util.Objects;

public class Person implements Serializable {
    private String firstName;
    private String lastName;

    public Person(String firstName, String lastName) {
        this.firstName = firstName;
        this.lastName = lastName;
    }

    public String getFirstName() {
        return firstName;
    }

    public String getLastName() {
        return lastName;
    }

    @Override
    public int hashCode() {
        return Objects.hash(firstName, lastName);
    }

    @Override
    public boolean equals(Object other) {
        if (other instanceof Person) {
            Person p = (Person) other;
            return Objects.equals(firstName, p.firstName) && Objects.equals(lastName, p.lastName);
        } else {
            return false;
        }
    }
}

また、RESTでの送受信の際にJavaでのシリアライズ・デシリアライズが必要になるので、テストクラスにメソッドを追加。

  private def javaDeserialize[T](binary: Array[Byte]): T =
    new ObjectInputStream(new ByteArrayInputStream(binary)).readObject.asInstanceOf[T]

  private def javaSerialize(target: Any): Array[Byte] = {
    val baos = new ByteArrayOutputStream
    val oos = new ObjectOutputStream(baos)
    oos.writeObject(target)
    oos.flush()
    baos.toByteArray
  }
Compatibility Modeが無効なCacheを使う

一応、こちらも試しておきます。

    it("user defined class cache, compatibility disabled") {
      val manager = new RemoteCacheManager(new ConfigurationBuilder().addServers("localhost:11222").build)
      val cache = manager.getCache[String, Person]("namedCache")

      val client = ClientBuilder.newBuilder.build

      try {
        cache.put("key1", new Person("カツオ", "磯野"))
        cache.get("key1") should be(new Person("カツオ", "磯野"))

        val getResponse =
          client
            .target("http://localhost:8080/rest/namedCache/key1")
            .request("application/x-java-serialized-object")
            .get

        getResponse.getStatus should be(Response.Status.NOT_FOUND.getStatusCode)
        getResponse.close()

        val putResponse =
          client
            .target("http://localhost:8080/rest/namedCache/key2")
            .request("application/x-java-serialized-object")
            .put(Entity.entity(javaSerialize(new Person("ワカメ", "磯野")), "application/x-java-serialized-object"))

        putResponse.getStatus should be(Response.Status.OK.getStatusCode)
        putResponse.close()

        cache.get("key2") should be(null)
      } finally {
        cache.clear()
        cache.stop()
        manager.stop()

        client.close()
      }
    }

まあ、先ほどと結果は一緒ですね。

Compatibility Modeを有効にしたCacheを使う

それでは、続いてCompatibility Modeを有効にしたCacheを使ってみます。

    it("user defined class cache, compatibility enabled") {
      val manager = new RemoteCacheManager(new ConfigurationBuilder().addServers("localhost:11222").build)
      val cache = manager.getCache[String, Person]("compatibilityCache")

      val client = ClientBuilder.newBuilder.build

      try {
        a[HotRodClientException] should be thrownBy cache.put("key1", new Person("カツオ", "磯野"))
      } finally {
        cache.clear()
        cache.stop()
        manager.stop()

        client.close()
      }
    }

なんと、こちらはHot RodのCacheに登録しようとしたところでエラーになります。

この時、Infinispan Server側のログには、以下のようなスタックトレースが出力されています。

00:09:36,809 ERROR [org.infinispan.interceptors.InvocationContextInterceptor] (HotRodServerWorker-17-10) ISPN000136: Execution error: java.lang.ClassNotFoundException: org.littlewings.infinispan.compatibility.Person from [Module "org.infinispan.commons:main" from local module loader @67117f44 (finder: local module finder @5d3411d (roots: /xxxxx/infinispan-server-7.2.1.Final/modules,/xxxxx/infinispan-server-7.2.1.Final/modules/system/layers/base))]
	at org.jboss.modules.ModuleClassLoader.findClass(ModuleClassLoader.java:213) [jboss-modules.jar:1.3.3.Final]
	at org.jboss.modules.ConcurrentClassLoader.performLoadClassUnchecked(ConcurrentClassLoader.java:459) [jboss-modules.jar:1.3.3.Final]
	at org.jboss.modules.ConcurrentClassLoader.performLoadClassChecked(ConcurrentClassLoader.java:408) [jboss-modules.jar:1.3.3.Final]
	at org.jboss.modules.ConcurrentClassLoader.performLoadClass(ConcurrentClassLoader.java:389) [jboss-modules.jar:1.3.3.Final]
	at org.jboss.modules.ConcurrentClassLoader.loadClass(ConcurrentClassLoader.java:134) [jboss-modules.jar:1.3.3.Final]
	at java.lang.Class.forName0(Native Method) [rt.jar:1.8.0_45]
	at java.lang.Class.forName(Class.java:348) [rt.jar:1.8.0_45]
        〜省略〜

自分で作成したクラスが見つかりませんよっと。

Compatibility Modeを無効にしたCacheには普通に登録できるので、となるとCompatibility Modeを有効にするとサーバー側でもデシリアライズするようになるということでしょうね。

困りました。

なら、Infinispan Server(というかWildFly)のモジュールとして今回作成したクラスをインストールすればいいのかな?と思い、チャレンジしてみました。

まずは、今回作成したクラスをJARにパッケージング。

> package

中身は、ホントにこれだけ。

$ jar -tvf target/scala-2.11/remote-hotrod-rest-compatibility_2.11-0.0.1-SNAPSHOT.jar 
   337 Thu May 21 00:14:20 JST 2015 META-INF/MANIFEST.MF
     0 Thu May 21 00:14:20 JST 2015 org/
     0 Thu May 21 00:14:20 JST 2015 org/littlewings/
     0 Thu May 21 00:14:20 JST 2015 org/littlewings/infinispan/
     0 Thu May 21 00:14:20 JST 2015 org/littlewings/infinispan/compatibility/
  1143 Sat May 16 00:05:52 JST 2015 org/littlewings/infinispan/compatibility/Person.class

いったん、Infinispan Serverを停止します。
※モジュールだけディレクトリに配置しても、再起動しないと反映されませんでした

このJARファイルを配置するディレクトリを、Infinispan Server内に作成します。

$ mkdir -p infinispan-server-7.2.1.Final/modules/system/layers/base/org/littlewings/main

今回は、「org/littlewings/main」としました。

このディレクトリに、先ほど作成したJARファイルをコピーします。バージョン番号などは、不要なので削ります。

$ cp /path/to/target/scala-2.11/remote-hotrod-rest-compatibility_2.11-0.0.1-SNAPSHOT.jar infinispan-server-7.2.1.Final/modules/system/layers/base/org/littlewings/main/remote-hotrod-rest-compatibility.jar

そして、module.xmlを作成します。

<?xml version="1.0" encoding="UTF-8"?>
<module xmlns="urn:jboss:module:1.3" name="org.littlewings">
    <resources>
        <resource-root path="remote-hotrod-rest-compatibility.jar"/>
    </resources>
</module>

このmodule.xmlを、作成したJARファイルと同じディレクトリに配置。

$ cp /path/to/src/main/compatibility-module/module.xml infinispan-server-7.2.1.Final/modules/system/layers/base/org/littlewings/main/

最後に、infinispan-commonsのmodule.xmlを修正します。

$ vim infinispan-server-7.2.1.Final/modules/system/layers/base/org/infinispan/commons/main/module.xml

dependenciesタグの中に、先ほど追加したmodule.xmlのname属性の値を設定します。

    <dependencies>
        <module name="javax.api"/>
        <module name="javax.transaction.api"/>
        <module name="javax.xml.bind.api"/>
        <module name="org.littlewings"/>
        <module name="org.jboss.logging"/>
        <module name="org.jboss.marshalling" services="import"/>
        <module name="sun.jdk"/>
    </dependencies>

追加位置は、「org.jboss.marshalling」の前でないとダメでした。というか、このファイルを修正しなくてはいけないんでしょうか…?

ここまで済んだら、Infinispan Serverを起動。

$ infinispan-server-7.2.1.Final/bin/standalone.sh

すると、今度はテストコードが通るようになります。

    it("user defined class cache, compatibility enabled") {
      val manager = new RemoteCacheManager(new ConfigurationBuilder().addServers("localhost:11222").build)
      val cache = manager.getCache[String, Person]("compatibilityCache")

      val client = ClientBuilder.newBuilder.build

      try {
        // a[HotRodClientException] should be thrownBy cache.put("key1", new Person("カツオ", "磯野"))

        cache.put("key1", new Person("カツオ", "磯野"))
        cache.get("key1") should be(new Person("カツオ", "磯野"))

        val getResponse =
          client
            .target("http://localhost:8080/rest/compatibilityCache/key1")
            .request("application/x-java-serialized-object")
            .get

        getResponse.getStatus should be(Response.Status.OK.getStatusCode)
        javaDeserialize[Person](getResponse.readEntity(classOf[Array[Byte]])) should be(new Person("カツオ", "磯野"))
        getResponse.close()

        val putResponse =
          client
            .target("http://localhost:8080/rest/compatibilityCache/key2")
            .request("application/x-java-serialized-object")
            .put(Entity.entity(javaSerialize(new Person("ワカメ", "磯野")), "application/x-java-serialized-object"))

        putResponse.getStatus should be(Response.Status.OK.getStatusCode)
        putResponse.close()

        cache.get("key2") should be(new Person("ワカメ", "磯野"))
      } finally {
        cache.clear()
        cache.stop()
        manager.stop()

        client.close()
      }
    }

とりあえず、なんとかなりましたね。

まとめ

それなりに苦労しましたが、Compatibility Modeを使ってHot RodとRESTでCacheを相互に使うことができました。ちょっと、module.xmlの修正には不安が残りますが…。

あと、Infinispan Serverの異なるEndpointをコンパチにするのは、どうなのかなぁとちょっと思ったり。「application/x-java-serialized-object」でRESTアクセスさせるくらいなら、もうHot Rodでいいような気も。
コンパチにするなら、Embedded ModeとHot Rodかなぁという気がしました。自分は、うまく設定できてませんけど。

まあ、Remote CacheのEndpointのコンパチについては、一応できましたということで。

今回作成したソースコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-hotrod-rest-compatibility