CLOVER🍀

That was when it all began.

InfinispanのEmbedded CacheとRemote Cache(Hot Rod)をコンパチにする

以前から、何回か挑戦していてずっとうまくいっていなかったネタです。

InfinispanにCompatibility Modeというものがあり、異なるアクセス方法を取るCache(例えばHot RodとREST)を相互に使用可能にする機能です。

33.1. Enable Compatibility Mode
http://infinispan.org/docs/7.2.x/user_guide/user_guide.html#_enable_compatibility_mode

以前、これを使ってHot RodとRESTのEndpointをコンパチにして遊んでいたことがあります。

Infinispan Serverの、Hot RodとRESTのEndpointをコンパチにする
http://d.hatena.ne.jp/Kazuhira/20150522/1432302400

今回は、Embedded CacheとHot Rodをコンパチにしようというネタです。

こちらのソースコードを参考にして、今回試してみました。

https://github.com/infinispan/infinispan/tree/master/integrationtests/compatibility-mode-it/src/test/java/org/infinispan/it/compatibility

準備

依存関係の定義。
build.sbt

name := "embedded-remote-compatibility-mode"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.7"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

fork in Test := true

parallelExecution in Test := false

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-core" % "7.2.3.Final",
  "org.infinispan" % "infinispan-client-hotrod" % "7.2.3.Final",
  "org.infinispan" % "infinispan-server-hotrod" % "7.2.3.Final",
  "net.jcip" % "jcip-annotations" % "1.0" % "provided",
  "org.scalatest" %% "scalatest" % "2.2.5" % "test"
)

「infinispan-core」と「infinispan-client-hotrod」がいるのはいいとして、「infinispan-server-hotrod」という見慣れないものがいます。

これは、Hot RodのServer用モジュールです。

つまり、今回はEmbedded Cacheを使いつつ、一緒にHot Rod Serverも起動してそのCacheを相互に使います。なんでInfinispan Server Distributionじゃないの?というのは、最後に…。

今回のInfinispanの設定ファイルは、こちら。
src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:7.2 http://www.infinispan.org/schemas/infinispan-config-7.2.xsd"
        xmlns="urn:infinispan:config:7.2">
    <jgroups>
        <stack-file name="udp" path="jgroups.xml"/>
    </jgroups>

    <cache-container name="clustereded" shutdown-hook="REGISTER">
        <transport cluster="clustered" stack="udp"/>
        <jmx duplicate-domains="true"/>

        <distributed-cache name="noCompatibilityCache"/>

        <distributed-cache name="compatibilityCache">
            <compatibility/>
        </distributed-cache>
    </cache-container>
</infinispan>

Compatibility Modeを有効にしたCacheと、そうでないCacheの2つを用意しました。

Compatibility Modeを有効にするには、最低限これをしておけばOKです。

        <distributed-cache name="compatibilityCache">
            <compatibility/>
        </distributed-cache>

JGroupsの設定は、端折ります。

確認してみる

それでは、確認のためにテストコードを書いてみます。テストコードの雛形は、こちら。
src/test/scala/org/littlewings/infinispan/compatibility/CompatibilitySpec.scala

package org.littlewings.infinispan.compatibility

import org.infinispan.Cache
import org.infinispan.client.hotrod.RemoteCacheManager
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder
import org.infinispan.manager.DefaultCacheManager
import org.infinispan.server.hotrod.HotRodServer
import org.infinispan.server.hotrod.configuration.HotRodServerConfigurationBuilder
import org.scalatest.FunSpec
import org.scalatest.Matchers._

class CompatibilitySpec extends FunSpec {
  describe("Compatibility Spec") {
    // ここに、テストを書く1!
  }

  protected def withEmbeddedCache[K, V](numInstances: Int, cacheName: String)(f: Cache[K, V] => Unit): Unit = {
    val managers = (1 to numInstances).map(_ => new DefaultCacheManager("infinispan.xml"))
    managers.foreach(_.getCache[K, V](cacheName))

    try {
      val cache = managers.head.getCache[K, V](cacheName)

      f(cache)

      cache.stop()
    } finally {
      managers.foreach(_.stop())
    }
  }
}

Embedded Cacheについては、簡単にクラスタを構成するためのメソッドを付けています。

では、まずCompatibility Modeが無効のCacheで試してみます。

    it("disabled compatibility mode, simple") {
      withEmbeddedCache[String, String](2, "noCompatibilityCache") { embeddedCache =>
        val embeddedCacheManager = embeddedCache.getCacheManager

        val remoteServerPort = 11222
        val remoteServer = new HotRodServer
        remoteServer.start(
          new HotRodServerConfigurationBuilder().host("localhost").port(remoteServerPort).build,
          embeddedCacheManager
        )

        val remoteCacheManager =
          new RemoteCacheManager(
            new ConfigurationBuilder().addServers(s"localhost:$remoteServerPort").build
          )
        val remoteCache = remoteCacheManager.getCache[String, String]("noCompatibilityCache")

        embeddedCache.put("key-from-embedded", "value-from-embedded")
        remoteCache.get("key-from-embedded") should be(null)
        embeddedCache.get("key-from-embedded") should be("value-from-embedded")

        remoteCache.put("key-from-remote", "value-from-remote")
        embeddedCache.get("key-from-remote") should be(null)
        remoteCache.get("key-from-remote") should be(null)

        remoteCache.stop()
        remoteServer.stop
      }
    }

ポイントは、まずここですね。

        val remoteServerPort = 11222
        val remoteServer = new HotRodServer
        remoteServer.start(
          new HotRodServerConfigurationBuilder().host("localhost").port(remoteServerPort).build,
          embeddedCacheManager
        )

これで、Hot RodのServerを起動できます。Embedded CacheのCacheManagerが必要ですが、意外と簡単に立てられるんですねぇ…。

同じ感じで、MemcachedやRESTのServerでもOKそうな気がします。

ちなみに、停止はstopすればよさそうです。

        remoteServer.stop

その他、RemoteCacheの取得方法などの説明は端折ります。

で、Compatibility Modeを有効にしていないと、片方のCacheに登録したエントリを、別のCacheから読み出すことができません。

        embeddedCache.put("key-from-embedded", "value-from-embedded")
        remoteCache.get("key-from-embedded") should be(null)
        embeddedCache.get("key-from-embedded") should be("value-from-embedded")

        remoteCache.put("key-from-remote", "value-from-remote")
        embeddedCache.get("key-from-remote") should be(null)
        remoteCache.get("key-from-remote") should be(null)

また、RemoteCacheにputしたエントリがRemoteCacheからgetで読み出せていないのですが、これはEquivalenceを設定していないことが原因です。

Infinispan ServerをEmbeddedで動かす - CLOVER

それでは、今度はComatibility Modeを有効にしたCacheで試してみます。

    it("enabled compatibility mode, simple") {
      withEmbeddedCache[String, String](2, "compatibilityCache") { embeddedCache =>
        val embeddedCacheManager = embeddedCache.getCacheManager

        val remoteServerPort = 11222
        val remoteServer = new HotRodServer
        remoteServer.start(
          new HotRodServerConfigurationBuilder().host("localhost").port(remoteServerPort).build,
          embeddedCacheManager
        )

        val remoteCacheManager =
          new RemoteCacheManager(
            new ConfigurationBuilder().addServers(s"localhost:$remoteServerPort").build
          )
        val remoteCache = remoteCacheManager.getCache[String, String]("compatibilityCache")

        embeddedCache.put("key-from-embedded", "value-from-embedded")
        remoteCache.get("key-from-embedded") should be("value-from-embedded")
        embeddedCache.get("key-from-embedded") should be("value-from-embedded")

        remoteCache.put("key-from-remote", "value-from-remote")
        embeddedCache.get("key-from-remote") should be("value-from-remote")
        remoteCache.get("key-from-remote") should be("value-from-remote")

        remoteCache.stop()
        remoteServer.stop
      }
    }

こちらなら、OKでした!

        embeddedCache.put("key-from-embedded", "value-from-embedded")
        remoteCache.get("key-from-embedded") should be("value-from-embedded")
        embeddedCache.get("key-from-embedded") should be("value-from-embedded")

        remoteCache.put("key-from-remote", "value-from-remote")
        embeddedCache.get("key-from-remote") should be("value-from-remote")
        remoteCache.get("key-from-remote") should be("value-from-remote")

しかも、RemoteCacheでputしたエントリを読み出せるようにもなっています。

あともう1パターンとして、自分でクラスを定義して試してみます。
src/test/scala/org/littlewings/infinispan/compatibility/Person.scala

package org.littlewings.infinispan.compatibility

import java.util.Objects

class Person(val name: String, val age: Int) extends Serializable {
  override def equals(o: Any): Boolean =
  o match {
    case other: Person => Objects.equals(name, other.name) && Objects.equals(age, other.age)
    case _ => false
  }

  override def hashCode: Int = Objects.hash(name, Integer.valueOf(age))
}

こちらも、問題なく動きましたよ。

    it("enabled compatibility mode, use case class") {
      withEmbeddedCache[String, Person](2, "compatibilityCache") { embeddedCache =>
        val embeddedCacheManager = embeddedCache.getCacheManager

        val remoteServerPort = 11222
        val remoteServer = new HotRodServer
        remoteServer.start(
          new HotRodServerConfigurationBuilder().host("localhost").port(remoteServerPort).build,
          embeddedCacheManager
        )

        val remoteCacheManager =
          new RemoteCacheManager(
            new ConfigurationBuilder().addServers(s"localhost:$remoteServerPort").build
          )
        val remoteCache = remoteCacheManager.getCache[String, Person]("compatibilityCache")

        embeddedCache.put("key-from-embedded", new Person("磯野カツオ", 11))
        remoteCache.get("key-from-embedded") should be(new Person("磯野カツオ", 11))
        embeddedCache.get("key-from-embedded") should be(new Person("磯野カツオ", 11))

        remoteCache.put("key-from-remote", new Person("磯野ワカメ", 9))
        embeddedCache.get("key-from-remote") should be(new Person("磯野ワカメ", 9))
        remoteCache.get("key-from-remote") should be(new Person("磯野ワカメ", 9))

        remoteCache.stop()
        remoteServer.stop
      }
    }

まとめ

InfinispanのCompatibility Modeを使用して、Emebedded CacheとHot RodでのRemote Cacheをコンパチにしてみました。また、Infinispan Serverを意外と簡単に立てられそうなところが、個人的には収穫だったかなぁと。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-remote-compatibility-mode

オマケ

最初に、以前から挑戦していてうまくいっていなかったみたいなことを書きました。

で、今回も結局それは越えられなかったのですが…。

何で困っていたかというと、Infinispan Server Distributionを使ってCompatibility Modeを試した時に、うまくクラスタに入れなかったんですよね。Nodeは見つけてくれるのですが、JBoss Marshalling RiverでのUnmarshallingに失敗するという…。

6 24, 2015 11:16:05 午後 org.jgroups.blocks.RequestCorrelator receiveMessage
重大: failed unmarshalling buffer into return value
java.io.UTFDataFormatException: Invalid byte
	at org.jboss.marshalling.UTFUtils.readUTFBytes(UTFUtils.java:162)
	at org.jboss.marshalling.river.RiverUnmarshaller.readUTF(RiverUnmarshaller.java:1833)
	at org.jboss.marshalling.river.RiverUnmarshaller.doReadClassDescriptor(RiverUnmarshaller.java:959)
	at org.jboss.marshalling.river.RiverUnmarshaller.doReadNewObject(RiverUnmarshaller.java:1255)
	at org.jboss.marshalling.river.RiverUnmarshaller.doReadObject(RiverUnmarshaller.java:276)
	at org.jboss.marshalling.river.RiverUnmarshaller.doReadObject(RiverUnmarshaller.java:209)
	at org.jboss.marshalling.AbstractObjectInput.readObject(AbstractObjectInput.java:41)
	at org.infinispan.topology.CacheTopology$Externalizer.doReadObject(CacheTopology.java:204)
	at org.infinispan.topology.CacheTopology$Externalizer.doReadObject(CacheTopology.java:186)
	at org.infinispan.commons.marshall.InstanceReusingAdvancedExternalizer.readObject(InstanceReusingAdvancedExternalizer.java:102)
	at org.infinispan.marshall.core.ExternalizerTable$ExternalizerAdapter.readObject(ExternalizerTable.java:436)
	at org.infinispan.marshall.core.ExternalizerTable.readObject(ExternalizerTable.java:227)
	at org.infinispan.marshall.core.JBossMarshaller$ExternalizerTableProxy.readObject(JBossMarshaller.java:153)
	at org.jboss.marshalling.river.RiverUnmarshaller.doReadObject(RiverUnmarshaller.java:354)
	at org.jboss.marshalling.river.RiverUnmarshaller.doReadObject(RiverUnmarshaller.java:209)
	at org.jboss.marshalling.AbstractObjectInput.readObject(AbstractObjectInput.java:41)
	at org.infinispan.topology.CacheStatusResponse$Externalizer.readObject(CacheStatusResponse.java:73)
	at org.infinispan.topology.CacheStatusResponse$Externalizer.readObject(CacheStatusResponse.java:60)
	at org.infinispan.marshall.core.ExternalizerTable$ExternalizerAdapter.readObject(ExternalizerTable.java:436)
	at org.infinispan.marshall.core.ExternalizerTable.readObject(ExternalizerTable.java:227)
	at org.infinispan.marshall.core.JBossMarshaller$ExternalizerTableProxy.readObject(JBossMarshaller.java:153)
	at org.jboss.marshalling.river.RiverUnmarshaller.doReadObject(RiverUnmarshaller.java:354)
	at org.jboss.marshalling.river.RiverUnmarshaller.doReadObject(RiverUnmarshaller.java:209)
	at org.jboss.marshalling.AbstractObjectInput.readObject(AbstractObjectInput.java:41)
	at org.infinispan.remoting.responses.SuccessfulResponse$Externalizer.readObject(SuccessfulResponse.java:92)
	at org.infinispan.remoting.responses.SuccessfulResponse$Externalizer.readObject(SuccessfulResponse.java:77)
	at org.infinispan.marshall.core.ExternalizerTable$ExternalizerAdapter.readObject(ExternalizerTable.java:436)
	at org.infinispan.marshall.core.ExternalizerTable.readObject(ExternalizerTable.java:227)
	at org.infinispan.marshall.core.JBossMarshaller$ExternalizerTableProxy.readObject(JBossMarshaller.java:153)
	at org.jboss.marshalling.river.RiverUnmarshaller.doReadObject(RiverUnmarshaller.java:354)
	at org.jboss.marshalling.river.RiverUnmarshaller.doReadObject(RiverUnmarshaller.java:209)
	at org.jboss.marshalling.AbstractObjectInput.readObject(AbstractObjectInput.java:41)
	at org.infinispan.commons.marshall.jboss.AbstractJBossMarshaller.objectFromObjectStream(AbstractJBossMarshaller.java:134)
	at org.infinispan.marshall.core.VersionAwareMarshaller.objectFromByteBuffer(VersionAwareMarshaller.java:101)
	at org.infinispan.commons.marshall.AbstractDelegatingMarshaller.objectFromByteBuffer(AbstractDelegatingMarshaller.java:80)
	at org.infinispan.remoting.transport.jgroups.MarshallerAdapter.objectFromBuffer(MarshallerAdapter.java:28)
	at org.jgroups.blocks.RequestCorrelator.receiveMessage(RequestCorrelator.java:390)
	at org.jgroups.blocks.RequestCorrelator.receive(RequestCorrelator.java:250)
	at org.jgroups.blocks.MessageDispatcher$ProtocolAdapter.up(MessageDispatcher.java:675)
	at org.jgroups.JChannel.up(JChannel.java:739)
	at org.jgroups.stack.ProtocolStack.up(ProtocolStack.java:1029)
	at org.jgroups.protocols.FRAG2.up(FRAG2.java:165)
	at org.jgroups.protocols.FlowControl.up(FlowControl.java:394)
	at org.jgroups.protocols.FlowControl.up(FlowControl.java:383)
	at org.jgroups.protocols.pbcast.GMS.up(GMS.java:1042)
	at org.jgroups.protocols.pbcast.STABLE.up(STABLE.java:234)
	at org.jgroups.protocols.UNICAST3.deliverMessage(UNICAST3.java:1064)
	at org.jgroups.protocols.UNICAST3.handleDataReceived(UNICAST3.java:779)
	at org.jgroups.protocols.UNICAST3.up(UNICAST3.java:426)
	at org.jgroups.protocols.pbcast.NAKACK2.up(NAKACK2.java:652)
	at org.jgroups.protocols.VERIFY_SUSPECT.up(VERIFY_SUSPECT.java:155)
	at org.jgroups.protocols.FD_ALL.up(FD_ALL.java:200)
	at org.jgroups.protocols.FD_SOCK.up(FD_SOCK.java:297)
	at org.jgroups.protocols.MERGE3.up(MERGE3.java:288)
	at org.jgroups.protocols.Discovery.up(Discovery.java:291)
	at org.jgroups.protocols.TP.passMessageUp(TP.java:1577)
	at org.jgroups.protocols.TP$MyHandler.run(TP.java:1796)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

どこかに普通に単体で書いたEmbedded Cacheと、Infinispan Serverをコンパチにしてるサンプルとか転がってないかな…。