CLOVER🍀

That was when it all began.

InfinispanのCache#putForExternalReadって、何?

InfinispanのCacheインターフェースのメソッドを見てたりとか、ドキュメントを見てたりした時にたまに見かけていたCache#putForExternalReadメソッド。ちょっと気になっていたので、使ってみました。

Using the Cache API - putForExternalRead operation
https://docs.jboss.org/author/display/ISPN/Using+the+Cache+API#UsingtheCacheAPI-{{putForExternalRead}}operation

これを読んでみると…こういうこと?

  • Infinispanがデータのキャッシュとして使用されている場合に特に有効なのでは?
  • データの作成、読み出しに時間がかかるようなデータを格納するのに使用するとよい
  • そのようなデータを作成する時には、トランザクションが遅延してしまうし最適化もしづらい

putForExtenalReadlメソッドの特性としては、Javadocと合わせて読むとこういうことらしいです
http://docs.jboss.org/infinispan/5.2/apidocs/org/infinispan/Cache.html#putForExternalRead%28K,%20V%29

  • ConcurrentMap#putIfAbsentによく似た動きをする。キーに対する値がなかった場合は処理を行うが、そうでない場合は何もしない
  • 強制的に非同期モードになり、レプリケーションをブロックしない
  • ロックは取らず、TimeoutExceptionもスローされない
  • トランザクションが走っていたら、いったん中断する。トランザクションに影響を与えない
  • エラー、例外は黙殺する。このメソッドは、例外を投げない

データを、外部ストレージに格納するようなイメージのものってことだそうで。

これを使うシーンの例としては、レガシーなデータストアからデータを読み出す時に使用するといいんではないでしょうか、みたいなことが書かれています。

使ってみる

では、まずは簡単に試してみます。

    val manager = new DefaultCacheManager
    val cache = manager.getCache[String, String]()

    cache.putForExternalRead("key-ext", "value-ext")
    println(s"putForExternalRead: key-ext => ${cache.get("key-ext")}")

    cache.putForExternalRead("key-ext", "value updated updated")
    println(s"putForExternalRead Change: key-ext => ${cache.get("key-ext")}")

これを動かしてみると、

> run
[info] putForExternalRead: key-ext => value-ext
[info] putForExternalRead Change: key-ext => value-ext

なるほど、確かに1度putForExternalReadで入れると上書きできませんね。

もう少し、バリエーションを見てみましょう。

    val manager = new DefaultCacheManager
    val cache = manager.getCache[String, String]()

    cache.putForExternalRead("key-ext", "value-ext")
    println(s"putForExternalRead: key-ext => ${cache.get("key-ext")}")

    cache.putForExternalRead("key-ext", "value updated updated")
    println(s"putForExternalRead Change: key-ext => ${cache.get("key-ext")}")

    cache.put("key-ext", "value updated")
    println(s"put: key-ext => ${cache.get("key-ext")}")

    cache.putForExternalRead("key-ext", "value updated updated")
    println(s"putForExternalRead Change: key-ext => ${cache.get("key-ext")}")

    cache.evict("key-ext")
    println(s"evict: key-ext => ${cache.get("key-ext")}")

    cache.putForExternalRead("key-ext", "value-ext")
    println(s"putForExternalRead: key-ext => ${cache.get("key-ext")}")

    cache.remove("key-ext")
    println(s"remove: key-ext => ${cache.get("key-ext")}")

結果は、こうなりました。

> run
[info] putForExternalRead: key-ext => value-ext
[info] putForExternalRead Change: key-ext => value-ext
[info] put: key-ext => value updated
[info] putForExternalRead Change: key-ext => value updated
[info] evict: key-ext => null
[info] putForExternalRead: key-ext => value-ext
[info] remove: key-ext => null

putForExternalReadでは値を変更できませんが、putを使うと変更できてしまう。また、evictされたりremoveされたりすると、やっぱりなくなっちゃうってことですね。
この他、expirationを設定して有効期限切れした場合も、同様にされることが確認できました。

JTAと組み合わせる

トランザクションの範囲外にいくよって書いているので、こちらも試してみました。

    val manager = new DefaultCacheManager
    manager.defineConfiguration("jta-cache",
                                new ConfigurationBuilder()
                                  .transaction
                                  .transactionMode(TransactionMode.TRANSACTIONAL)
                                  .build)
    val cache = manager.getCache[String, String]("jta-cache")

    val tm = cache.getAdvancedCache.getTransactionManager

    tm.begin()

    (1 to 3) foreach (i => cache.put(s"key${i}", s"value${i}"))
    cache.putForExternalRead("ext-key", "ext-value")

    tm.rollback()

    (1 to 3) foreach (i => println(s"key${i} => ${cache.get(s"key$i")}")) 
    println(cache.get("ext-key"))

トランザクション中でエントリを3つ放り込んでいますが、最後にロールバックしています。ただ、その間に1つputExternalReadでエントリを追加しています。

では、実行。

> run
[info] key1 => null
[info] key2 => null
[info] key3 => null
[info] ext-value

トランザクション中に登録したデータはロールバックしていますが、putExternalForReadで登録したものはそのまま残っていますね。

せっかくなので、この部分のソースも眺めてみました。
https://github.com/infinispan/infinispan/blob/master/core/src/main/java/org/infinispan/CacheImpl.java

   @Override
   public final void putForExternalRead(K key, V value) {
      putForExternalRead(key, value, null, null);
   }

   final void putForExternalRead(K key, V value, EnumSet<Flag> explicitFlags, ClassLoader explicitClassLoader) {
      Transaction ongoingTransaction = null;
      try {
         ongoingTransaction = getOngoingTransaction();
         if (ongoingTransaction != null)
            transactionManager.suspend();

         EnumSet<Flag> flags = EnumSet.of(FAIL_SILENTLY, FORCE_ASYNCHRONOUS, ZERO_LOCK_ACQUISITION_TIMEOUT, PUT_FOR_EXTERNAL_READ);
         if (explicitFlags != null && !explicitFlags.isEmpty()) {
            flags.addAll(explicitFlags);
         }

         // if the entry exists then this should be a no-op.
         putIfAbsent(key, value, defaultLifespan, TimeUnit.MILLISECONDS, defaultMaxIdleTime, TimeUnit.MILLISECONDS, flags, explicitClassLoader);
      } catch (Exception e) {
         if (log.isDebugEnabled()) log.debug("Caught exception while doing putForExternalRead()", e);
      }
      finally {
         try {
            if (ongoingTransaction != null) transactionManager.resume(ongoingTransaction);
         } catch (Exception e) {
            if (log.isDebugEnabled())
               log.debug("Had problems trying to resume a transaction after putForExternalRead()", e);
         }
      }
   }

確かに、ドキュメントに書かれているような挙動をしそうですね。

最後に、今回作成したコードを載せておきます。

import org.infinispan.configuration.cache.ConfigurationBuilder
import org.infinispan.manager.{DefaultCacheManager, EmbeddedCacheManager}
import org.infinispan.transaction.TransactionMode

object PutForExternalReadExample {
  def main(args: Array[String]): Unit = {
    val manager = new DefaultCacheManager
    try {
      simply(manager)
      withJTA(manager)
    } finally {
      manager.stop
    }
  }

  def simply(manager: EmbeddedCacheManager): Unit = {
    val cache = manager.getCache[String, String]()

    cache.putForExternalRead("key-ext", "value-ext")

    println(s"putForExternalRead: key-ext => ${cache.get("key-ext")}")

    cache.putForExternalRead("key-ext", "value updated updated")
    println(s"putForExternalRead Change: key-ext => ${cache.get("key-ext")}")

    cache.put("key-ext", "value updated")
    println(s"put: key-ext => ${cache.get("key-ext")}")

    cache.putForExternalRead("key-ext", "value updated updated")
    println(s"putForExternalRead Change: key-ext => ${cache.get("key-ext")}")

    cache.evict("key-ext")
    println(s"evict: key-ext => ${cache.get("key-ext")}")

    cache.putForExternalRead("key-ext", "value-ext")
    println(s"putForExternalRead: key-ext => ${cache.get("key-ext")}")

    cache.remove("key-ext")
    println(s"remove: key-ext => ${cache.get("key-ext")}")
  }

  def withJTA(manager: EmbeddedCacheManager): Unit = {
    manager.defineConfiguration("jta-cache",
                                new ConfigurationBuilder()
                                  .transaction
                                  .transactionMode(TransactionMode.TRANSACTIONAL)
                                  .build)
    val cache = manager.getCache[String, String]("jta-cache")

    val tm = cache.getAdvancedCache.getTransactionManager

    tm.begin()

    (1 to 3) foreach (i => cache.put(s"key${i}", s"value${i}"))
    cache.putForExternalRead("ext-key", "ext-value")

    tm.rollback()

    (1 to 3) foreach (i => println(s"key${i} => ${cache.get(s"key$i")}")) 
    println(cache.get("ext-key"))
  }
}