以前から、何回か挑戦していてずっとうまくいっていなかったネタです。
InfinispanにCompatibility Modeというものがあり、異なるアクセス方法を取るCache(例えばHot RodとREST)を相互に使用可能にする機能です。
33.1. Enable Compatibility Mode
http://infinispan.org/docs/7.2.x/user_guide/user_guide.html#_enable_compatibility_mode
以前、これを使ってHot RodとRESTのEndpointをコンパチにして遊んでいたことがあります。
Infinispan Serverの、Hot RodとRESTのEndpointをコンパチにする
http://d.hatena.ne.jp/Kazuhira/20150522/1432302400
今回は、Embedded CacheとHot Rodをコンパチにしようというネタです。
こちらのソースコードを参考にして、今回試してみました。
準備
依存関係の定義。
build.sbt
name := "embedded-remote-compatibility-mode" version := "0.0.1-SNAPSHOT" scalaVersion := "2.11.7" organization := "org.littlewings" scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature") updateOptions := updateOptions.value.withCachedResolution(true) fork in Test := true parallelExecution in Test := false libraryDependencies ++= Seq( "org.infinispan" % "infinispan-core" % "7.2.3.Final", "org.infinispan" % "infinispan-client-hotrod" % "7.2.3.Final", "org.infinispan" % "infinispan-server-hotrod" % "7.2.3.Final", "net.jcip" % "jcip-annotations" % "1.0" % "provided", "org.scalatest" %% "scalatest" % "2.2.5" % "test" )
「infinispan-core」と「infinispan-client-hotrod」がいるのはいいとして、「infinispan-server-hotrod」という見慣れないものがいます。
これは、Hot RodのServer用モジュールです。
つまり、今回はEmbedded Cacheを使いつつ、一緒にHot Rod Serverも起動してそのCacheを相互に使います。なんでInfinispan Server Distributionじゃないの?というのは、最後に…。
今回のInfinispanの設定ファイルは、こちら。
src/test/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:7.2 http://www.infinispan.org/schemas/infinispan-config-7.2.xsd" xmlns="urn:infinispan:config:7.2"> <jgroups> <stack-file name="udp" path="jgroups.xml"/> </jgroups> <cache-container name="clustereded" shutdown-hook="REGISTER"> <transport cluster="clustered" stack="udp"/> <jmx duplicate-domains="true"/> <distributed-cache name="noCompatibilityCache"/> <distributed-cache name="compatibilityCache"> <compatibility/> </distributed-cache> </cache-container> </infinispan>
Compatibility Modeを有効にしたCacheと、そうでないCacheの2つを用意しました。
Compatibility Modeを有効にするには、最低限これをしておけばOKです。
<distributed-cache name="compatibilityCache"> <compatibility/> </distributed-cache>
JGroupsの設定は、端折ります。
確認してみる
それでは、確認のためにテストコードを書いてみます。テストコードの雛形は、こちら。
src/test/scala/org/littlewings/infinispan/compatibility/CompatibilitySpec.scala
package org.littlewings.infinispan.compatibility import org.infinispan.Cache import org.infinispan.client.hotrod.RemoteCacheManager import org.infinispan.client.hotrod.configuration.ConfigurationBuilder import org.infinispan.manager.DefaultCacheManager import org.infinispan.server.hotrod.HotRodServer import org.infinispan.server.hotrod.configuration.HotRodServerConfigurationBuilder import org.scalatest.FunSpec import org.scalatest.Matchers._ class CompatibilitySpec extends FunSpec { describe("Compatibility Spec") { // ここに、テストを書く1! } protected def withEmbeddedCache[K, V](numInstances: Int, cacheName: String)(f: Cache[K, V] => Unit): Unit = { val managers = (1 to numInstances).map(_ => new DefaultCacheManager("infinispan.xml")) managers.foreach(_.getCache[K, V](cacheName)) try { val cache = managers.head.getCache[K, V](cacheName) f(cache) cache.stop() } finally { managers.foreach(_.stop()) } } }
Embedded Cacheについては、簡単にクラスタを構成するためのメソッドを付けています。
では、まずCompatibility Modeが無効のCacheで試してみます。
it("disabled compatibility mode, simple") { withEmbeddedCache[String, String](2, "noCompatibilityCache") { embeddedCache => val embeddedCacheManager = embeddedCache.getCacheManager val remoteServerPort = 11222 val remoteServer = new HotRodServer remoteServer.start( new HotRodServerConfigurationBuilder().host("localhost").port(remoteServerPort).build, embeddedCacheManager ) val remoteCacheManager = new RemoteCacheManager( new ConfigurationBuilder().addServers(s"localhost:$remoteServerPort").build ) val remoteCache = remoteCacheManager.getCache[String, String]("noCompatibilityCache") embeddedCache.put("key-from-embedded", "value-from-embedded") remoteCache.get("key-from-embedded") should be(null) embeddedCache.get("key-from-embedded") should be("value-from-embedded") remoteCache.put("key-from-remote", "value-from-remote") embeddedCache.get("key-from-remote") should be(null) remoteCache.get("key-from-remote") should be(null) remoteCache.stop() remoteServer.stop } }
ポイントは、まずここですね。
val remoteServerPort = 11222 val remoteServer = new HotRodServer remoteServer.start( new HotRodServerConfigurationBuilder().host("localhost").port(remoteServerPort).build, embeddedCacheManager )
これで、Hot RodのServerを起動できます。Embedded CacheのCacheManagerが必要ですが、意外と簡単に立てられるんですねぇ…。
同じ感じで、MemcachedやRESTのServerでもOKそうな気がします。
ちなみに、停止はstopすればよさそうです。
remoteServer.stop
その他、RemoteCacheの取得方法などの説明は端折ります。
で、Compatibility Modeを有効にしていないと、片方のCacheに登録したエントリを、別のCacheから読み出すことができません。
embeddedCache.put("key-from-embedded", "value-from-embedded") remoteCache.get("key-from-embedded") should be(null) embeddedCache.get("key-from-embedded") should be("value-from-embedded") remoteCache.put("key-from-remote", "value-from-remote") embeddedCache.get("key-from-remote") should be(null) remoteCache.get("key-from-remote") should be(null)
また、RemoteCacheにputしたエントリがRemoteCacheからgetで読み出せていないのですが、これはEquivalenceを設定していないことが原因です。
Infinispan ServerをEmbeddedで動かす - CLOVER
それでは、今度はComatibility Modeを有効にしたCacheで試してみます。
it("enabled compatibility mode, simple") { withEmbeddedCache[String, String](2, "compatibilityCache") { embeddedCache => val embeddedCacheManager = embeddedCache.getCacheManager val remoteServerPort = 11222 val remoteServer = new HotRodServer remoteServer.start( new HotRodServerConfigurationBuilder().host("localhost").port(remoteServerPort).build, embeddedCacheManager ) val remoteCacheManager = new RemoteCacheManager( new ConfigurationBuilder().addServers(s"localhost:$remoteServerPort").build ) val remoteCache = remoteCacheManager.getCache[String, String]("compatibilityCache") embeddedCache.put("key-from-embedded", "value-from-embedded") remoteCache.get("key-from-embedded") should be("value-from-embedded") embeddedCache.get("key-from-embedded") should be("value-from-embedded") remoteCache.put("key-from-remote", "value-from-remote") embeddedCache.get("key-from-remote") should be("value-from-remote") remoteCache.get("key-from-remote") should be("value-from-remote") remoteCache.stop() remoteServer.stop } }
こちらなら、OKでした!
embeddedCache.put("key-from-embedded", "value-from-embedded") remoteCache.get("key-from-embedded") should be("value-from-embedded") embeddedCache.get("key-from-embedded") should be("value-from-embedded") remoteCache.put("key-from-remote", "value-from-remote") embeddedCache.get("key-from-remote") should be("value-from-remote") remoteCache.get("key-from-remote") should be("value-from-remote")
しかも、RemoteCacheでputしたエントリを読み出せるようにもなっています。
あともう1パターンとして、自分でクラスを定義して試してみます。
src/test/scala/org/littlewings/infinispan/compatibility/Person.scala
package org.littlewings.infinispan.compatibility import java.util.Objects class Person(val name: String, val age: Int) extends Serializable { override def equals(o: Any): Boolean = o match { case other: Person => Objects.equals(name, other.name) && Objects.equals(age, other.age) case _ => false } override def hashCode: Int = Objects.hash(name, Integer.valueOf(age)) }
こちらも、問題なく動きましたよ。
it("enabled compatibility mode, use case class") { withEmbeddedCache[String, Person](2, "compatibilityCache") { embeddedCache => val embeddedCacheManager = embeddedCache.getCacheManager val remoteServerPort = 11222 val remoteServer = new HotRodServer remoteServer.start( new HotRodServerConfigurationBuilder().host("localhost").port(remoteServerPort).build, embeddedCacheManager ) val remoteCacheManager = new RemoteCacheManager( new ConfigurationBuilder().addServers(s"localhost:$remoteServerPort").build ) val remoteCache = remoteCacheManager.getCache[String, Person]("compatibilityCache") embeddedCache.put("key-from-embedded", new Person("磯野カツオ", 11)) remoteCache.get("key-from-embedded") should be(new Person("磯野カツオ", 11)) embeddedCache.get("key-from-embedded") should be(new Person("磯野カツオ", 11)) remoteCache.put("key-from-remote", new Person("磯野ワカメ", 9)) embeddedCache.get("key-from-remote") should be(new Person("磯野ワカメ", 9)) remoteCache.get("key-from-remote") should be(new Person("磯野ワカメ", 9)) remoteCache.stop() remoteServer.stop } }
まとめ
InfinispanのCompatibility Modeを使用して、Emebedded CacheとHot RodでのRemote Cacheをコンパチにしてみました。また、Infinispan Serverを意外と簡単に立てられそうなところが、個人的には収穫だったかなぁと。
今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-remote-compatibility-mode
オマケ
最初に、以前から挑戦していてうまくいっていなかったみたいなことを書きました。
で、今回も結局それは越えられなかったのですが…。
何で困っていたかというと、Infinispan Server Distributionを使ってCompatibility Modeを試した時に、うまくクラスタに入れなかったんですよね。Nodeは見つけてくれるのですが、JBoss Marshalling RiverでのUnmarshallingに失敗するという…。
6 24, 2015 11:16:05 午後 org.jgroups.blocks.RequestCorrelator receiveMessage 重大: failed unmarshalling buffer into return value java.io.UTFDataFormatException: Invalid byte at org.jboss.marshalling.UTFUtils.readUTFBytes(UTFUtils.java:162) at org.jboss.marshalling.river.RiverUnmarshaller.readUTF(RiverUnmarshaller.java:1833) at org.jboss.marshalling.river.RiverUnmarshaller.doReadClassDescriptor(RiverUnmarshaller.java:959) at org.jboss.marshalling.river.RiverUnmarshaller.doReadNewObject(RiverUnmarshaller.java:1255) at org.jboss.marshalling.river.RiverUnmarshaller.doReadObject(RiverUnmarshaller.java:276) at org.jboss.marshalling.river.RiverUnmarshaller.doReadObject(RiverUnmarshaller.java:209) at org.jboss.marshalling.AbstractObjectInput.readObject(AbstractObjectInput.java:41) at org.infinispan.topology.CacheTopology$Externalizer.doReadObject(CacheTopology.java:204) at org.infinispan.topology.CacheTopology$Externalizer.doReadObject(CacheTopology.java:186) at org.infinispan.commons.marshall.InstanceReusingAdvancedExternalizer.readObject(InstanceReusingAdvancedExternalizer.java:102) at org.infinispan.marshall.core.ExternalizerTable$ExternalizerAdapter.readObject(ExternalizerTable.java:436) at org.infinispan.marshall.core.ExternalizerTable.readObject(ExternalizerTable.java:227) at org.infinispan.marshall.core.JBossMarshaller$ExternalizerTableProxy.readObject(JBossMarshaller.java:153) at org.jboss.marshalling.river.RiverUnmarshaller.doReadObject(RiverUnmarshaller.java:354) at org.jboss.marshalling.river.RiverUnmarshaller.doReadObject(RiverUnmarshaller.java:209) at org.jboss.marshalling.AbstractObjectInput.readObject(AbstractObjectInput.java:41) at org.infinispan.topology.CacheStatusResponse$Externalizer.readObject(CacheStatusResponse.java:73) at org.infinispan.topology.CacheStatusResponse$Externalizer.readObject(CacheStatusResponse.java:60) at org.infinispan.marshall.core.ExternalizerTable$ExternalizerAdapter.readObject(ExternalizerTable.java:436) at org.infinispan.marshall.core.ExternalizerTable.readObject(ExternalizerTable.java:227) at org.infinispan.marshall.core.JBossMarshaller$ExternalizerTableProxy.readObject(JBossMarshaller.java:153) at org.jboss.marshalling.river.RiverUnmarshaller.doReadObject(RiverUnmarshaller.java:354) at org.jboss.marshalling.river.RiverUnmarshaller.doReadObject(RiverUnmarshaller.java:209) at org.jboss.marshalling.AbstractObjectInput.readObject(AbstractObjectInput.java:41) at org.infinispan.remoting.responses.SuccessfulResponse$Externalizer.readObject(SuccessfulResponse.java:92) at org.infinispan.remoting.responses.SuccessfulResponse$Externalizer.readObject(SuccessfulResponse.java:77) at org.infinispan.marshall.core.ExternalizerTable$ExternalizerAdapter.readObject(ExternalizerTable.java:436) at org.infinispan.marshall.core.ExternalizerTable.readObject(ExternalizerTable.java:227) at org.infinispan.marshall.core.JBossMarshaller$ExternalizerTableProxy.readObject(JBossMarshaller.java:153) at org.jboss.marshalling.river.RiverUnmarshaller.doReadObject(RiverUnmarshaller.java:354) at org.jboss.marshalling.river.RiverUnmarshaller.doReadObject(RiverUnmarshaller.java:209) at org.jboss.marshalling.AbstractObjectInput.readObject(AbstractObjectInput.java:41) at org.infinispan.commons.marshall.jboss.AbstractJBossMarshaller.objectFromObjectStream(AbstractJBossMarshaller.java:134) at org.infinispan.marshall.core.VersionAwareMarshaller.objectFromByteBuffer(VersionAwareMarshaller.java:101) at org.infinispan.commons.marshall.AbstractDelegatingMarshaller.objectFromByteBuffer(AbstractDelegatingMarshaller.java:80) at org.infinispan.remoting.transport.jgroups.MarshallerAdapter.objectFromBuffer(MarshallerAdapter.java:28) at org.jgroups.blocks.RequestCorrelator.receiveMessage(RequestCorrelator.java:390) at org.jgroups.blocks.RequestCorrelator.receive(RequestCorrelator.java:250) at org.jgroups.blocks.MessageDispatcher$ProtocolAdapter.up(MessageDispatcher.java:675) at org.jgroups.JChannel.up(JChannel.java:739) at org.jgroups.stack.ProtocolStack.up(ProtocolStack.java:1029) at org.jgroups.protocols.FRAG2.up(FRAG2.java:165) at org.jgroups.protocols.FlowControl.up(FlowControl.java:394) at org.jgroups.protocols.FlowControl.up(FlowControl.java:383) at org.jgroups.protocols.pbcast.GMS.up(GMS.java:1042) at org.jgroups.protocols.pbcast.STABLE.up(STABLE.java:234) at org.jgroups.protocols.UNICAST3.deliverMessage(UNICAST3.java:1064) at org.jgroups.protocols.UNICAST3.handleDataReceived(UNICAST3.java:779) at org.jgroups.protocols.UNICAST3.up(UNICAST3.java:426) at org.jgroups.protocols.pbcast.NAKACK2.up(NAKACK2.java:652) at org.jgroups.protocols.VERIFY_SUSPECT.up(VERIFY_SUSPECT.java:155) at org.jgroups.protocols.FD_ALL.up(FD_ALL.java:200) at org.jgroups.protocols.FD_SOCK.up(FD_SOCK.java:297) at org.jgroups.protocols.MERGE3.up(MERGE3.java:288) at org.jgroups.protocols.Discovery.up(Discovery.java:291) at org.jgroups.protocols.TP.passMessageUp(TP.java:1577) at org.jgroups.protocols.TP$MyHandler.run(TP.java:1796) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
どこかに普通に単体で書いたEmbedded Cacheと、Infinispan Serverをコンパチにしてるサンプルとか転がってないかな…。