CLOVER🍀

That was when it all began.

Infinispan Asynchronous API

久々にInfinispan。1ヵ月振りなので、簡単そうなものをちょっと触ってみました。

Asynchronous API
https://docs.jboss.org/author/display/ISPN/Asynchronous+API

読んで字のごとく、非同期APIです。

Cache#putやget、removeなどのメソッドにAsyncを付けることで、非同期化することができます。
http://docs.jboss.org/infinispan/5.2/apidocs/org/infinispan/api/BasicCache.html

Embedded Cache、Remote Cacheの共通の親インターフェースに定義されているので、どちらのキャッシュでも使用できます。

基本的には、

cache.putAsync("key1", "...")
cache.getAsync("key1")

みたいにするだけです。

各Asyncメソッドの戻り値としては、NotifyingFutureインターフェースを実装したクラスが戻ってきます。
http://docs.jboss.org/infinispan/5.0/apidocs/org/infinispan/util/concurrent/NotifyingFuture.html
こちらは、JDKのFutureインターフェースのサブインターフェースでもあるので、ふつうにFuture#getで待ち合わせをすることができます。

では、試してみましょう。

こういうのは、Remote Cacheで実行するのがわかりやすいだろうと思うので、まずはInfinispan ServerをHot Rodで起動。

$ bin/startServer.sh -r hotrod

用意したソースは、こんな感じです。
build.sbt

name := "infinispan-async"

version := "0.0.1"

scalaVersion := "2.10.0"

organization := "littlewings"

fork in run := true

javaOptions += "-Xmx1536M"

scalacOptions += "-deprecation"

resolvers += "JBoss Public Maven Repository Group" at "http://repository.jboss.org/nexus/content/groups/public-jboss/"

libraryDependencies += "org.infinispan" % "infinispan-client-hotrod" % "5.2.1.Final"

src/main/scala/AsyncOperation.scala

import scala.collection.mutable.ListBuffer

import java.util.Date
import java.util.concurrent.Future

import org.infinispan.api.{BasicCache, BasicCacheContainer}
import org.infinispan.manager.DefaultCacheManager
import org.infinispan.client.hotrod.RemoteCacheManager
import org.infinispan.util.concurrent.{FutureListener, NotifyingFuture}

object AsyncOperation {
  def createManager(mode: String): BasicCacheContainer =
    mode match {
      case "e" => new DefaultCacheManager
      case "r" => new RemoteCacheManager("localhost")
    }

  def main(args: Array[String]): Unit = {
    val manager = createManager(args.toList.headOption.getOrElse("r"))
    println(s"use [${manager.getClass.getName}]")
    val cache: BasicCache[String, String] = manager.getCache()

    val bigString =
                (0 to 10000000)
                  .foldLeft(new StringBuilder) { (b, i) =>
                  b ++= i.toString
                }.toString

    val log = (msg: String) => println(s"[${new Date}] $msg")

    log("Created Data Size[%1$,3d]".format(bigString.getBytes("UTF-8").size))

    val futures = new ListBuffer[NotifyingFuture[String]]

    log("Put Async Cache Entries Start")

    futures += cache.putAsync("key1", bigString)
    futures += cache.putAsync("key2", bigString)

    log("Put Async Cache Entries End")

    futures.foreach { f =>
      log(s"Future Done. [${f.get}]")
    }

    futures.clear()

    log("Get Async Cache Entries Start")

    futures += cache.getAsync("key1")
    futures += cache.getAsync("key2")

    log("Get Async Cache Entries Start")

    futures.foreach { f =>
      val s = f.get
      log("Future Done. Size = [%1$,3d]".format(s.getBytes("UTF-8").size))
    }

    log("Program End")
  }
}

ちょっと大きな文字列を作って

    val bigString =
                (0 to 10000000)
                  .foldLeft(new StringBuilder) { (b, i) =>
                  b ++= i.toString
                }.toString

putAsyncしたり

    log("Put Async Cache Entries Start")

    futures += cache.putAsync("key1", bigString)
    futures += cache.putAsync("key2", bigString)

    log("Put Async Cache Entries End")

    futures.foreach { f =>
      log(s"Future Done. [${f.get}]")
    }

getAsyncしたりしています。

    log("Get Async Cache Entries Start")

    futures += cache.getAsync("key1")
    futures += cache.getAsync("key2")

    log("Get Async Cache Entries Start")

    futures.foreach { f =>
      val s = f.get
      log("Future Done. Size = [%1$,3d]".format(s.getBytes("UTF-8").size))
    }

戻り値のFutureは、待ち合わせに使っています。

では、実行。

> run
[info] Running AsyncOperation 
[error] 3 09, 2013 7:17:39 午後 org.infinispan.client.hotrod.RemoteCacheManager start
[error] INFO: ISPN004021: Infinispan version: Infinispan 'Delirium' 5.2.1.Final
[info] use [org.infinispan.client.hotrod.RemoteCacheManager]
[info] [Sat Mar 09 19:17:42 JST 2013] Created Data Size[68,888,898]
[info] [Sat Mar 09 19:17:42 JST 2013] Put Async Cache Entries Start
[info] [Sat Mar 09 19:17:42 JST 2013] Put Async Cache Entries End
[info] [Sat Mar 09 19:17:46 JST 2013] Future Done. [null]
[info] [Sat Mar 09 19:17:47 JST 2013] Future Done. [null]
[info] [Sat Mar 09 19:17:47 JST 2013] Get Async Cache Entries Start
[info] [Sat Mar 09 19:17:47 JST 2013] Get Async Cache Entries Start
[info] [Sat Mar 09 19:17:55 JST 2013] Future Done. Size = [68,888,898]
[info] [Sat Mar 09 19:18:00 JST 2013] Future Done. Size = [68,888,898]
[info] [Sat Mar 09 19:18:00 JST 2013] Program End
[success] Total time: 29 s, completed 2013/03/09 19:18:06

putAsyncとgetAsyncは一瞬で終わっていますが、その後のFuture#getで時間がかかっていますね。

ちなみに、Embedded Cacheでやるとすごく高速に終わります。…そりゃあ、そうですよね。

> run e
[info] Running AsyncOperation e
[info] use [org.infinispan.manager.DefaultCacheManager]
[error] 3 09, 2013 7:18:46 午後 org.infinispan.factories.GlobalComponentRegistry start
[error] INFO: ISPN000128: Infinispan version: Infinispan 'Delirium' 5.2.1.Final
[error] 3 09, 2013 7:18:47 午後 org.infinispan.jmx.CacheJmxRegistration start
[error] INFO: ISPN000031: MBeans were successfully registered to the platform MBean server.
[info] [Sat Mar 09 19:18:51 JST 2013] Created Data Size[68,888,898]
[info] [Sat Mar 09 19:18:51 JST 2013] Put Async Cache Entries Start
[info] [Sat Mar 09 19:18:51 JST 2013] Put Async Cache Entries End
[info] [Sat Mar 09 19:18:51 JST 2013] Future Done. [null]
[info] [Sat Mar 09 19:18:51 JST 2013] Future Done. [null]
[info] [Sat Mar 09 19:18:51 JST 2013] Get Async Cache Entries Start
[info] [Sat Mar 09 19:18:51 JST 2013] Get Async Cache Entries Start
[info] [Sat Mar 09 19:18:52 JST 2013] Future Done. Size = [68,888,898]
[info] [Sat Mar 09 19:18:53 JST 2013] Future Done. Size = [68,888,898]
[info] [Sat Mar 09 19:18:53 JST 2013] Program End
[success] Total time: 14 s, completed 2013/03/09 19:18:53

その他、org.infinispan.util.concurrent.FutureListenerインターフェースを使用することで、Futureの完了イベントを監視することもできるようなのですが…ちょっとうまく動作させられなかったので、今回はパスです…。