久々にInfinispan。1ヵ月振りなので、簡単そうなものをちょっと触ってみました。
Asynchronous API
https://docs.jboss.org/author/display/ISPN/Asynchronous+API
読んで字のごとく、非同期APIです。
Cache#putやget、removeなどのメソッドにAsyncを付けることで、非同期化することができます。
http://docs.jboss.org/infinispan/5.2/apidocs/org/infinispan/api/BasicCache.html
Embedded Cache、Remote Cacheの共通の親インターフェースに定義されているので、どちらのキャッシュでも使用できます。
基本的には、
cache.putAsync("key1", "...") cache.getAsync("key1")
みたいにするだけです。
各Asyncメソッドの戻り値としては、NotifyingFutureインターフェースを実装したクラスが戻ってきます。
http://docs.jboss.org/infinispan/5.0/apidocs/org/infinispan/util/concurrent/NotifyingFuture.html
こちらは、JDKのFutureインターフェースのサブインターフェースでもあるので、ふつうにFuture#getで待ち合わせをすることができます。
では、試してみましょう。
こういうのは、Remote Cacheで実行するのがわかりやすいだろうと思うので、まずはInfinispan ServerをHot Rodで起動。
$ bin/startServer.sh -r hotrod
用意したソースは、こんな感じです。
build.sbt
name := "infinispan-async" version := "0.0.1" scalaVersion := "2.10.0" organization := "littlewings" fork in run := true javaOptions += "-Xmx1536M" scalacOptions += "-deprecation" resolvers += "JBoss Public Maven Repository Group" at "http://repository.jboss.org/nexus/content/groups/public-jboss/" libraryDependencies += "org.infinispan" % "infinispan-client-hotrod" % "5.2.1.Final"
src/main/scala/AsyncOperation.scala
import scala.collection.mutable.ListBuffer import java.util.Date import java.util.concurrent.Future import org.infinispan.api.{BasicCache, BasicCacheContainer} import org.infinispan.manager.DefaultCacheManager import org.infinispan.client.hotrod.RemoteCacheManager import org.infinispan.util.concurrent.{FutureListener, NotifyingFuture} object AsyncOperation { def createManager(mode: String): BasicCacheContainer = mode match { case "e" => new DefaultCacheManager case "r" => new RemoteCacheManager("localhost") } def main(args: Array[String]): Unit = { val manager = createManager(args.toList.headOption.getOrElse("r")) println(s"use [${manager.getClass.getName}]") val cache: BasicCache[String, String] = manager.getCache() val bigString = (0 to 10000000) .foldLeft(new StringBuilder) { (b, i) => b ++= i.toString }.toString val log = (msg: String) => println(s"[${new Date}] $msg") log("Created Data Size[%1$,3d]".format(bigString.getBytes("UTF-8").size)) val futures = new ListBuffer[NotifyingFuture[String]] log("Put Async Cache Entries Start") futures += cache.putAsync("key1", bigString) futures += cache.putAsync("key2", bigString) log("Put Async Cache Entries End") futures.foreach { f => log(s"Future Done. [${f.get}]") } futures.clear() log("Get Async Cache Entries Start") futures += cache.getAsync("key1") futures += cache.getAsync("key2") log("Get Async Cache Entries Start") futures.foreach { f => val s = f.get log("Future Done. Size = [%1$,3d]".format(s.getBytes("UTF-8").size)) } log("Program End") } }
ちょっと大きな文字列を作って
val bigString = (0 to 10000000) .foldLeft(new StringBuilder) { (b, i) => b ++= i.toString }.toString
putAsyncしたり
log("Put Async Cache Entries Start") futures += cache.putAsync("key1", bigString) futures += cache.putAsync("key2", bigString) log("Put Async Cache Entries End") futures.foreach { f => log(s"Future Done. [${f.get}]") }
getAsyncしたりしています。
log("Get Async Cache Entries Start") futures += cache.getAsync("key1") futures += cache.getAsync("key2") log("Get Async Cache Entries Start") futures.foreach { f => val s = f.get log("Future Done. Size = [%1$,3d]".format(s.getBytes("UTF-8").size)) }
戻り値のFutureは、待ち合わせに使っています。
では、実行。
> run [info] Running AsyncOperation [error] 3 09, 2013 7:17:39 午後 org.infinispan.client.hotrod.RemoteCacheManager start [error] INFO: ISPN004021: Infinispan version: Infinispan 'Delirium' 5.2.1.Final [info] use [org.infinispan.client.hotrod.RemoteCacheManager] [info] [Sat Mar 09 19:17:42 JST 2013] Created Data Size[68,888,898] [info] [Sat Mar 09 19:17:42 JST 2013] Put Async Cache Entries Start [info] [Sat Mar 09 19:17:42 JST 2013] Put Async Cache Entries End [info] [Sat Mar 09 19:17:46 JST 2013] Future Done. [null] [info] [Sat Mar 09 19:17:47 JST 2013] Future Done. [null] [info] [Sat Mar 09 19:17:47 JST 2013] Get Async Cache Entries Start [info] [Sat Mar 09 19:17:47 JST 2013] Get Async Cache Entries Start [info] [Sat Mar 09 19:17:55 JST 2013] Future Done. Size = [68,888,898] [info] [Sat Mar 09 19:18:00 JST 2013] Future Done. Size = [68,888,898] [info] [Sat Mar 09 19:18:00 JST 2013] Program End [success] Total time: 29 s, completed 2013/03/09 19:18:06
putAsyncとgetAsyncは一瞬で終わっていますが、その後のFuture#getで時間がかかっていますね。
ちなみに、Embedded Cacheでやるとすごく高速に終わります。…そりゃあ、そうですよね。
> run e [info] Running AsyncOperation e [info] use [org.infinispan.manager.DefaultCacheManager] [error] 3 09, 2013 7:18:46 午後 org.infinispan.factories.GlobalComponentRegistry start [error] INFO: ISPN000128: Infinispan version: Infinispan 'Delirium' 5.2.1.Final [error] 3 09, 2013 7:18:47 午後 org.infinispan.jmx.CacheJmxRegistration start [error] INFO: ISPN000031: MBeans were successfully registered to the platform MBean server. [info] [Sat Mar 09 19:18:51 JST 2013] Created Data Size[68,888,898] [info] [Sat Mar 09 19:18:51 JST 2013] Put Async Cache Entries Start [info] [Sat Mar 09 19:18:51 JST 2013] Put Async Cache Entries End [info] [Sat Mar 09 19:18:51 JST 2013] Future Done. [null] [info] [Sat Mar 09 19:18:51 JST 2013] Future Done. [null] [info] [Sat Mar 09 19:18:51 JST 2013] Get Async Cache Entries Start [info] [Sat Mar 09 19:18:51 JST 2013] Get Async Cache Entries Start [info] [Sat Mar 09 19:18:52 JST 2013] Future Done. Size = [68,888,898] [info] [Sat Mar 09 19:18:53 JST 2013] Future Done. Size = [68,888,898] [info] [Sat Mar 09 19:18:53 JST 2013] Program End [success] Total time: 14 s, completed 2013/03/09 19:18:53
その他、org.infinispan.util.concurrent.FutureListenerインターフェースを使用することで、Futureの完了イベントを監視することもできるようなのですが…ちょっとうまく動作させられなかったので、今回はパスです…。