CLOVER🍀

That was when it all began.

InfinispanのRemote Storeを試してみる

ちょっと興味があって、InfinispanのRemote Storeを試してみました。

4.9. Remote store
http://infinispan.org/docs/6.0.x/user_guide/user_guide.html#_remote_store

Remote Storeというのは、InfinispanのCacheStoreのうちのひとつで、Remote Cache(Hot Rod)をデータの永続化先として使用します。自分は、今までFileStore、しかもSingleFileStoreになる前のものしか触ったことがありませんが。

では、試してみましょう。今回は、こんな構成を取ってみます。

Infinispan Embedded Cache(3 Node:Dist Cache) → Infinispan Server(2 Node:Dist Cache)

ムダに、Embedded CacheもDist Cacheにします。裏のInfinispan ServerのCacheは特に永続化しませんが、動作確認の間は起動したままにさせておくのでいいとしましょう。

準備

まずは、依存関係の定義。
build.sbt

name := "infinispan-remote-cachestore"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.1"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

incOptions := incOptions.value.withNameHashing(true)

parallelExecution in Test := false

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-cachestore-remote" % "6.0.2.Final" excludeAll(
    ExclusionRule(organization = "org.jgroups", name = "jgroups"),
    ExclusionRule(organization = "org.jboss.marshalling", name = "jboss-marshalling-river"),
    ExclusionRule(organization = "org.jboss.marshalling", name = "jboss-marshalling"),
    ExclusionRule(organization = "org.jboss.logging", name = "jboss-logging"),
    ExclusionRule(organization = "org.jboss.spec.javax.transaction", name = "jboss-transaction-api_1.1_spec")
  ),
  "org.jgroups" % "jgroups" % "3.4.1.Final",
  "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.1_spec" % "1.0.1.Final",
  "org.jboss.marshalling" % "jboss-marshalling-river" % "1.4.4.Final",
  "org.jboss.marshalling" % "jboss-marshalling" % "1.4.4.Final",
  "org.jboss.logging" % "jboss-logging" % "3.1.2.GA",
  "net.jcip" % "jcip-annotations" % "1.0",
  "org.scalatest" %% "scalatest" % "2.1.7" % "test"
)

Remote Storeを使うためには

"org.infinispan" % "infinispan-cachestore-remote" % "6.0.2.Final"

があればいいのですが、その他もろもろあるのはsbtの都合だったり、テストコードのためだったりします。

クラスタは組みますが、JGroupsの設定は端折りまして、Infinispanの設定。
src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd"
    xmlns="urn:infinispan:config:6.0">

  <global>
    <transport clusterName="RemoteCacheStoreCluster">
      <properties>
        <property name="configurationFile" value="jgroups.xml" />
      </properties>
    </transport>
    <globalJmxStatistics
        enabled="true"
        jmxDomain="org.infinispan"
        cacheManagerName="DefaultCacheManager"
        allowDuplicateDomains="true"
        />

    <shutdown hookBehavior="REGISTER"/>
  </global>

  <default />

  <namedCache name="noStoreCache">
    〜省略〜
  </namedCache>

  <namedCache name="remoteStoreCache">
    〜省略〜
  </namedCache>

  <namedCache name="remoteStoreCacheAsRaw">
    〜省略〜
  </namedCache>
</infinispan>

namedCacheの中身は、実際の利用時に記載していきます。

テストコードの骨格

動作確認は、いきなりですがScalaTestを使ったコードで行います。
src/test/scala/org/littlewings/infinispan/remotecachestore/InfinispanRemoteCacheStoreSpec.scala

package org.littlewings.infinispan.remotecachestore

import scala.collection.JavaConverters._

import java.util.Objects

import org.infinispan.Cache
import org.infinispan.client.hotrod.{RemoteCache, RemoteCacheManager}
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder
import org.infinispan.container.entries.InternalCacheEntry
import org.infinispan.manager.DefaultCacheManager

import org.scalatest.FunSpec
import org.scalatest.Matchers._

class InfinispanRemoteCacheStoreSpec extends FunSpec {

  // ここに、テストを書く!

  def withCache(numInstances: Int, cacheName: String)(fun: Cache[String, Entity] => Unit): Unit = {
    val managers = (1 to numInstances).map(_ => new DefaultCacheManager("infinispan.xml"))
    val cache = managers.head.getCache[String, Entity](cacheName)

    try {
      fun(cache)
    } finally {
      for {
        manager <- managers
        cacheName <- manager.getCacheNames.asScala
      } {
        manager.getCache[Any, Any](cacheName).stop()
      }

      managers.foreach(_.stop())
    }
  }

  def withRemoteCache[K, V](cacheName: String)(fun: RemoteCache[K, V] => Unit): Unit = {
    val manager =
      new RemoteCacheManager(
        new ConfigurationBuilder()
          .addServer
          .host("localhost")
          .port(11222)
          .host("localhost")
          .port(12222)
          .build
        )

    try {
      val cache = manager.getCache[K, V](cacheName)
      fun(cache)
    } finally {
      manager.stop()
    }
  }
}

EmbeddedCacheをひとつのプロセスでクラスタ化して使うためのメソッドと、Remote Cacheにアクセスするためのメソッドを用意しました。

また、Cacheに保存するクラスは、以下の定義とします。

@SerialVersionUID(1L)
class Entity(val value: String) extends Serializable {
  override def equals(other: Any): Boolean =
    other match {
      case o: Entity => value == o.value
      case _ => false
    }

  override def hashCode: Int =
    Objects.hash(value)
}

それでは、これらを使ってコードを書いていきます。

Infinispan Serverの起動

…の前に、先にInfinispan Serverを起動しておきます。

先に、設定ファイルに以下のCacheの定義を追記
infinispan-server-6.0.2.Final/standalone/configuration/clustered.xml

                <distributed-cache name="namedCache" mode="SYNC" start="EAGER"/>
                <distributed-cache name="storeCache" mode="SYNC" start="EAGER"/>
                <distributed-cache name="storeCacheAsRaw" mode="SYNC" start="EAGER"/>

もともとあった、「namedCache」の下に2つDistributed Cacheを追加しています。

これを使って、Infinispan Serverを2つ起動します。

ひとつめ。

$ infinispan-server-6.0.2.Final/bin/clustered.sh -Djboss.node.name=node1

ふたつめ。

$ infinispan-server-6.0.2.Final/bin/clustered.sh -Djboss.node.name=node2 -Djboss.socket.binding.port-offset=1000

同じマシン上で2つInfinispan Serverを起動するので、それぞれにNode名を与えて、2つ目のNodeにはポートを1,000ずらしてもらいました。

ところで、Infinispan Serverですが、Oracle Java 8では起動しませんでした。
以下のログが出たところで、止まっちゃうんですよね…。

18:35:48,076 INFO  [org.jboss.as] (MSC service thread 1-1) JBAS015899: JBoss Infinispan Server 6.0.2.Final (AS 7.2.0.Final) starting

仕方がないので、Infinispan ServerはJava 7で起動してもらいました。

では、今渡こそテストコードを書いていきます。

CacheStoreなし

まずは、CacheStoreのない普通のnamedCacheで試してみましょう。Cacheの定義は、以下とします。

  <namedCache name="noStoreCache">
    <clustering mode="dist" />
  </namedCache>

ムダにDistributed Cache。

これに対するテストコード。

  describe("no cache-store spec") {
    it("cluster down, missing data") {
      val clusterSize = 3
      val keysValues = (1 to 5).map(i => (s"key$i", new Entity(s"value$i")))

      withCache(clusterSize, "noStoreCache") { cache =>
        keysValues.foreach { case (k, v) => cache.put(k, v) }

        keysValues.foreach { case (k, v) =>
          cache.get(k) should be (new Entity(v.value))
        } 
      }

      withCache(clusterSize, "noStoreCache") { cache =>
        keysValues.foreach { case (k, v) =>
          cache.get(k) should be (null)
        }
      }
    }
  }

先ほどの説明通り、クラスタのサイズは3にしています。

withCacheメソッドを抜けるとクラスタがダウンするので、Cacheから保存したデータは取得できなくなっています。

      withCache(clusterSize, "noStoreCache") { cache =>
        keysValues.foreach { case (k, v) =>
          cache.get(k) should be (null)
        }
      }

まあ、普通ですね。

Remote Storeを使う

では、続いてRemote Storeを使った定義。

  <namedCache name="remoteStoreCache">
    <clustering mode="dist" />
    <persistence>
      <remoteStore xmlns="urn:infinispan:config:remote:6.0"
                   fetchPersistentState="false"
                   ignoreModifications="false"
                   purgeOnStartup="false"
                   remoteCacheName="storeCache">
        <servers>
          <server host="localhost" port="11222" />
          <server host="localhost" port="12222" />
        </servers>
        <connectionPool maxActive="10" exhaustedAction="CREATE_NEW" />
        <async enabled="false" />
      </remoteStore>
    </persistence>
  </namedCache>

保存先のRemote Cacheの名前は「storeCache」で、Infinispan Serverは2つNodeがあるという設定になっています。

設定は、先ほどのこちらと

4.9. Remote store
http://infinispan.org/docs/6.0.x/user_guide/user_guide.html#_remote_store

Remote StoreのXML Schemaのドキュメントを見ながら書いています。

urn:infinispan:config:remote:6.0
http://docs.jboss.org/infinispan/6.0/configdocs/infinispan-cachestore-remote-config-6.0.html

テストコード。

  describe("remote-cache-store spec") {
    it("saved data") {
      val clusterSize = 3
      val keysValues = (1 to 5).map(i => (s"key$i", new Entity(s"value$i")))

      withCache(clusterSize, "remoteStoreCache") { cache =>
        keysValues.foreach { case (k, v) => cache.put(k, v) }

        keysValues.foreach { case (k, v) =>
          cache.get(k) should be (new Entity(v.value))
        } 
      }

      withCache(clusterSize, "remoteStoreCache") { cache =>
        keysValues.foreach { case (k, v) =>
          cache.get(k) should be (new Entity(v.value))
        }
      }

      withRemoteCache[String, InternalCacheEntry]("storeCache") { remoteCache =>
        remoteCache should have size (keysValues.size)

        remoteCache.keySet should have size (1)
      }
    }
  }

Remote Storeがあるので、withCacheメソッドを抜けた後でも、Cacheに保存したエントリが取得できることが確認できます。

      withCache(clusterSize, "remoteStoreCache") { cache =>
        keysValues.foreach { case (k, v) =>
          cache.get(k) should be (new Entity(v.value))
        }
      }

あと、Remote Cacheに直接アクセスした場合は、Remote Cacheのエントリが保存する(今回はEntryというクラスのインスタンスを5つ保存している)数はEmbedded Cacheにputした数と同じですが、Remote Cacheから直接値を取得することはできなさそうです。

      withRemoteCache[String, InternalCacheEntry]("storeCache") { remoteCache =>
        remoteCache should have size (keysValues.size)

        remoteCache.keySet should have size (1)
      }

サイズこそ同じなものの、RemoteCache#keySetの結果が1になりました…。

Remote Storeに保存しつつ、Remote Cacheからもエントリを見れるようにする

先ほどのサンプルでは、Remote Storeに保存は行われるものの、Remote Cacheで保存したエントリを見ることはできませんでした。

これを見れるようにしたのが、以下のCache定義になります。

  <namedCache name="remoteStoreCacheAsRaw">
    <clustering mode="dist" />
    <persistence>
      <remoteStore xmlns="urn:infinispan:config:remote:6.0"
                   fetchPersistentState="false"
                   ignoreModifications="false"
                   purgeOnStartup="false"
                   remoteCacheName="storeCacheAsRaw"
                   rawValues="true">
        <servers>
          <server host="localhost" port="11222" />
          <server host="localhost" port="12222" />
        </servers>
        <connectionPool maxActive="10" exhaustedAction="CREATE_NEW" />
        <async enabled="false" />
      </remoteStore>
    </persistence>
  </namedCache>

ほとんど先の例と違いがないんですが、remoteStore要素の「rawValues」属性をtrueにしていることが変更点です。

それでは、テストコード。

  describe("remote-cache-store as raw spec") {
    it("saved raw data") {
      val clusterSize = 3
      val keysValues = (1 to 5).map(i => (s"key$i", new Entity(s"value$i")))

      withCache(clusterSize, "remoteStoreCacheAsRaw") { cache =>
        keysValues.foreach { case (k, v) => cache.put(k, v) }

        keysValues.foreach { case (k, v) =>
          cache.get(k) should be (new Entity(v.value))
        } 
      }

      withCache(clusterSize, "remoteStoreCacheAsRaw") { cache =>
        keysValues.foreach { case (k, v) =>
          cache.get(k) should be (new Entity(v.value))
        }
      }

      withRemoteCache[String, Entity]("storeCacheAsRaw") { remoteCache =>
        remoteCache should have size (keysValues.size)

        keysValues.foreach { case (k, v) =>
          remoteCache.get(k) should be (new Entity(v.value))
        }
      }
    }
  }

先ほどと同様、Embedded Cacheで構成されたクラスタがダウンした後もエントリが取得できます。

      withCache(clusterSize, "remoteStoreCacheAsRaw") { cache =>
        keysValues.foreach { case (k, v) =>
          cache.get(k) should be (new Entity(v.value))
        }
      }

これに加えて、Remote Cacheからもエントリが確認できるようになっています。

      withRemoteCache[String, Entity]("storeCacheAsRaw") { remoteCache =>
        remoteCache should have size (keysValues.size)

        keysValues.foreach { case (k, v) =>
          remoteCache.get(k) should be (new Entity(v.value))
        }
      }

XML SchemaでのrawValuesの説明は、以下のようになっています。

Normally the RemoteCacheStore stores values wrapped in InternalCacheEntry. Setting this property to true causes the raw values to be stored instead for interoperability with direct access by RemoteCacheManager. Defaults to false (disabled)

http://docs.jboss.org/infinispan/6.0/configdocs/infinispan-cachestore-remote-config-6.0.html

デフォルトではInternalCacheEntryとして保存されるそうですが、rawValuesをtrueにすることで、RemoteCacheManagerを直接使用した時の相互運用性が保たれるというわけですね。

もう少し、Cache Storeを追ってもいいかなーという気がしてきましたね。JDBC Storeも試してみようかな…。

今回書いたコードは、こちらにアップしています。
https://github.com/kazuhira-r/infinispan-examples/tree/master/infinispan-remote-cachestore