CLOVER🍀

That was when it all began.

InfinispanのAdvancedCacheLoader/AdvancedCacheWriterを使ってみる

前回、InfinispanのCacheLoaderとCacheWriterを実装して、簡単なCacheStoreを作ってみました。

InfinispanのCacheLoader/CacheWriterを使ってみる
http://d.hatena.ne.jp/Kazuhira/20140802/1406997368

こちらの続きです。

CacheLoader/CacheWriterの実装を用意しただけでは、fetchPersistentState、preload、purgeOnStartupは使用できませんでした。今回、AdvancedCacheLoader/AdvancedCacheWriterの実装を加えることで、これらを使えるようにしてみたいと思います。

ベースの実装は、前回のエントリに足していく形になるので、そちらを参照ください…。

AdvancedCacheLoader/AdvancedCacheWriterを実装する

では、前回作成したCacheStoreにAdvancedCacheLoaderおよびAdvancedCacheWriterインターフェースを実装するよう追加します。

今回、import文がこんな感じになりました。

import scala.collection._

import java.io._
import java.nio.file.{Files, Paths}
import java.util.concurrent.{Callable, Executor}

import org.infinispan.commons.io.ByteBuffer
import org.infinispan.executors.ExecutorAllCompletionService
import org.infinispan.marshall.core.MarshalledEntry
import org.infinispan.metadata.InternalMetadata
// import org.infinispan.persistence.spi.{CacheLoader, CacheWriter, ExternalStore, InitializationContext}
import org.infinispan.persistence.spi.{AdvancedCacheLoader, AdvancedCacheWriter, AdvancedLoadWriteStore, InitializationContext, PersistenceException}
import org.infinispan.persistence.{PersistenceUtil, TaskContextImpl}

クラスの宣言は、このような形に。

class SimpleMapCacheStore[K, V] extends AdvancedCacheLoader[K, V] with AdvancedCacheWriter[K, V] {
// class SimpleMapCacheStore[K, V] extends AdvancedLoadWriteStore[K, V] {  // こちらでも可

AdvancedCacheLoaderとAdvancedCacheWriterをそれぞれ実装してもよいですが、それらを合わせたAdvancedLoadWriteStoreインターフェースを実装するのもOKです。

AdvancedCacheLoaderインターフェースを実装したことにより、process、sizeメソッドを、AdvancedCacheWriterを実装したことにより、clear、purgeメソッドをそれぞれ実装する必要があります。

また、AdvancedCacheLoaderはCacheLoaderを、AdvancedCacheWriterはCacheWriterをぞれぞれ拡張したものなので、それらが持っていたメソッドも実装する必要があります。

あら、大変。なお、ConfigurationBuilderはそのまま使用します。

まずは、AdvancedCacheLoader側のメソッドから定義しましょう。
processメソッドですが、このように実装しました。

  // from AdvancedCacheLoader
  override def process(filter: AdvancedCacheLoader.KeyFilter[K],
                       task: AdvancedCacheLoader.CacheLoaderTask[K, V],
                       executor: Executor,
                       fetchValue: Boolean,
                       fetchMetadata: Boolean): Unit = {
    println(s"Store[$storeName] process, fetchValue[$fetchValue], fetchMetadata[$fetchMetadata]")

    val filterOrLoadAll =
      PersistenceUtil.notNull(filter).asInstanceOf[AdvancedCacheLoader.KeyFilter[K]]

    val eacs = new ExecutorAllCompletionService(executor)
    val taskContext = new TaskContextImpl

    store
      .keys
      .withFilter { k =>
          filterOrLoadAll.shouldLoadKey(k)
      }
      .foreach { k =>
        if (!taskContext.isStopped) {
          eacs.submit(new Callable[Void] {
            @throws(classOf[Exception])
            override def call(): Void = {
              val marshalledEntry = load(k, fetchValue, fetchMetadata)

              if (marshalledEntry != null) {
                println(s"Store[$storeName] loaded key[$k], on process")
                task.processEntry(marshalledEntry, taskContext)
              }

              null
            }
          })
        }
      }

    eacs.waitUntilAllCompleted()

    if (eacs.isExceptionThrown()) {
      throw new PersistenceException("Execution exception!", eacs.getFirstException());
    }
  }

processメソッドは、起動時やアクセス時など、けっこう随所で呼び出されます。今回は、SingleFileStoreの実装を参考にして作成しました。

引数で渡されたスレッドプールを使用してExecutorAllCompletionServiceを作成し、自Nodeに割り当てられたキーに対応する情報をロードします。

自NodeかどうかはAdvancedCacheLoader.KeyFilterで判定するのですが、こちらがnullの場合は全キーを対象にするようにPersistenceUtil.notNullで全キーを対象とするようなKeyFilterのインスタンスを受け取るようにしています。

fetchValueやfetchMetadataの引数は、Cache#getや起動時にCacheStoreのpreloadやfetchPersistentStateなどを有効にしておいたりするとtrueで呼び出されます。それ以外の時は、キーのロードのみ(Cacheのエントリ数の把握のため?)な感じでしょうか。

あと、余談としてCallableをVoidでパラメータ適用して、

          eacs.submit(new Callable[Void] {

メソッドの最後でnullを返していますが、これはScalaのUnitをここに書けず…(ExecutorAllCompletionService#submitがCallable[Void]を要求しているため)、仕方なくこのような形になりました。

続いて、sizeメソッド。今回は、単純に内部で持っているMapのサイズを返却するようにしました。

  // from AdvancedCacheLoader
  override def size: Int = {
    println(s"Store[$storeName] size")
    store.size
  }

続いて、AdvancedCacheWriter側のメソッド。
clearメソッドは、単純に内部で持っているMapをクリアするように実装しました。

  // from AdvancedCacheWriter
  override def clear(): Unit = {
    println(s"Store[$storeName] clear")
    store.clear()
  }

このメソッドを実装することで、Cache#clearにStoreもついてくることができるようになります。

そして、purgeメソッド。

  // from AdvancedCacheWriter
  override def purge(executor: Executor,
                     listener: AdvancedCacheWriter.PurgeListener[_]): Unit = {
    println(s"Store[$storeName] purge")

    executor.execute(new Runnable {
      override def run(): Unit = {
        val now = System.currentTimeMillis
        val marshaller = ctx.getMarshaller
        val factory = ctx.getByteBufferFactory
        val marshalledEntryFactory = ctx.getMarshalledEntryFactory

        store.foreach { case (k, (v, m)) =>
          val (kb, vb, mb) =
            (marshaller.objectToByteBuffer(k), marshaller.objectToByteBuffer(v), m)
          val (keyBuffer, valueBuffer, metaBuffer) =
            (factory.newByteBuffer(kb, 0, kb.size),
             factory.newByteBuffer(vb, 0, vb.size),
             factory.newByteBuffer(mb, 0, mb.size))

          val marshalledEntry =
            marshalledEntryFactory
              .newMarshalledEntry(keyBuffer, valueBuffer, metaBuffer)

          if (marshalledEntry.getMetadata.isExpired(now)) {
            println(s"Store[$storeName] purge key[$k]")
            store -= k

            if (listener != null) {
              println(s"Store[$storeName] listner nofiticate, key[$k]")
              val task =
                listener.asInstanceOf[AdvancedCacheWriter.PurgeListener[K]]
              task.entryPurged(k)
            }
          }
        }
      }
    })
  }

こちらは、有効期限切れのエントリを削除する場合に呼び出されます。expirationのwakeUpIntervalを短い値で調整すると、動作しているのを見ることができるでしょう。

こちらもExecutorを用いて実装し、エントリをパージした場合はAdvancedCacheWriter.PurgeListener#entryPurgedメソッドを呼び出して通知します。

また、このように有効期限切れを意識した関係上、CacheLoaderとして実装するloadメソッドの中身も、有効期限を見るように変更しておきました。

  // from CacheLoader
  override def load(key: K): MarshalledEntry[K, V] =
    load(key, true, true)

  def load(key: K, fetchValue: Boolean, fetchMetadata: Boolean): MarshalledEntry[K, V] = {
    println(s"Store[$storeName] try load key[$key], fetchValue[$fetchValue], fetchMetadata[$fetchMetadata]")

    val value = store.get(key)

    val marshaller = ctx.getMarshaller

    val binaryKey =  marshaller.objectToByteBuffer(key)

    val factory = ctx.getByteBufferFactory
    val keyBuffer = factory.newByteBuffer(binaryKey, 0, binaryKey.size)

    value match {
      case Some((v, m)) =>
        val valueBuffer =
          if (fetchValue) {
            val binaryValue = marshaller.objectToByteBuffer(v)
            factory.newByteBuffer(binaryValue, 0, binaryValue.size)
          } else {
            null
          }

        val metaBuffer =
          if (fetchMetadata && m != null)
            factory.newByteBuffer(m, 0, m.size)
          else
            null

        val marshalledEntry =
          ctx
            .getMarshalledEntryFactory
            .newMarshalledEntry(keyBuffer, valueBuffer, metaBuffer)
            .asInstanceOf[MarshalledEntry[K, V]]

        val now = System.currentTimeMillis
        val metadata = marshalledEntry.getMetadata

        if (metadata != null && metadata.isExpired(now)) {
          println(s"Store[$storeName] loaded, but expire entry, key[$key]")
          store -= key
          null
        } else {
          marshalledEntry
        }
      case None =>
        println(s"Store[$storeName] missing key[$key]")
        null
    }
  }

テストコード

では、動作確認を。前回のテストコードとそんなに変わらないので、断片だけ。

  describe("simple map cache store spec") {
    it("persistence") {
      val numInstances = 2

      SimpleMapCacheStore.instances(numInstances)
      withStoreCache[String, String]("storeCache", numInstances) { cache =>
        println(s"Cache size => ${cache.size}")

        val range = 1 to 10

        range.foreach(i => println(s"key[${s"key$i"}] => ${cache.get(s"key$i")}"))

        (1 to 10) foreach (i => cache.put(s"key$i", s"value$i"))

        /*
        println("Wait...")
        Thread.sleep(5000L)

        range.foreach(i => println(s"key[${s"key$i"}] => ${cache.get(s"key$i")}"))
        */
      }
    }
  }

コメントアウトしている箇所は、有効期限切れの確認などに。

Cacheの設定は、こんな感じで。

      val persistenceBuilder = new ConfigurationBuilder()
        .clustering
        .cacheMode(CacheMode.DIST_SYNC)
        .expiration
        // .maxIdle(3000L)
        // .wakeUpInterval(500L)
        .persistence

      manager.defineConfiguration(cacheName,
                                  persistenceBuilder
                                    .addStore(new SimpleMapCacheStoreConfigurationBuilder(persistenceBuilder))
                                    .fetchPersistentState(false)
                                    .preload(false)
                                    .shared(false)
                                    .purgeOnStartup(false)
                                    .ignoreModifications(false)
                                    .build)

コメントアウトしている箇所は、やっぱり有効期限切れの確認に利用。

とりあえず、ほとんどの設定項目をfalseにしていますが、1度動かしてみます。永続化したエントリは存在しない、空っぽの状態で。

> test
Store Name = store-1.dmp
Store[store-1.dmp], not exists.
Store Name = store-2.dmp
Store[store-2.dmp], not exists.
Store[store-1.dmp] process, fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] process, fetchValue[false], fetchMetadata[false]
Store[store-1.dmp] process, fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] process, fetchValue[false], fetchMetadata[false]
Store[store-1.dmp] size
Cache size => 0
Store[store-1.dmp] try load key[key1], fetchValue[true], fetchMetadata[true]
Store[store-1.dmp] missing key[key1]
key[key1] => null
Store[store-1.dmp] try load key[key2], fetchValue[true], fetchMetadata[true]
Store[store-1.dmp] missing key[key2]
key[key2] => null
Store[store-1.dmp] try load key[key3], fetchValue[true], fetchMetadata[true]
Store[store-1.dmp] missing key[key3]
key[key3] => null
Store[store-1.dmp] try load key[key4], fetchValue[true], fetchMetadata[true]
Store[store-1.dmp] missing key[key4]
key[key4] => null
Store[store-1.dmp] try load key[key5], fetchValue[true], fetchMetadata[true]
Store[store-1.dmp] missing key[key5]
key[key5] => null
Store[store-1.dmp] try load key[key6], fetchValue[true], fetchMetadata[true]
Store[store-1.dmp] missing key[key6]
key[key6] => null
Store[store-1.dmp] try load key[key7], fetchValue[true], fetchMetadata[true]
Store[store-1.dmp] missing key[key7]
key[key7] => null
Store[store-1.dmp] try load key[key8], fetchValue[true], fetchMetadata[true]
Store[store-1.dmp] missing key[key8]
key[key8] => null
Store[store-1.dmp] try load key[key9], fetchValue[true], fetchMetadata[true]
Store[store-1.dmp] missing key[key9]
key[key9] => null
Store[store-1.dmp] try load key[key10], fetchValue[true], fetchMetadata[true]
Store[store-1.dmp] missing key[key10]
key[key10] => null
Store[store-1.dmp] try load key[key1], fetchValue[true], fetchMetadata[true]
Store[store-1.dmp] missing key[key1]
Store[store-2.dmp] write (key, value) = (key1, value1)
Store[store-1.dmp] write (key, value) = (key1, value1)
Store[store-1.dmp] try load key[key2], fetchValue[true], fetchMetadata[true]
Store[store-1.dmp] missing key[key2]
Store[store-2.dmp] write (key, value) = (key2, value2)
Store[store-1.dmp] write (key, value) = (key2, value2)
Store[store-1.dmp] try load key[key3], fetchValue[true], fetchMetadata[true]
Store[store-1.dmp] missing key[key3]
Store[store-2.dmp] write (key, value) = (key3, value3)
Store[store-1.dmp] write (key, value) = (key3, value3)
Store[store-1.dmp] try load key[key4], fetchValue[true], fetchMetadata[true]
Store[store-1.dmp] missing key[key4]
Store[store-2.dmp] write (key, value) = (key4, value4)
Store[store-1.dmp] write (key, value) = (key4, value4)
Store[store-1.dmp] try load key[key5], fetchValue[true], fetchMetadata[true]
Store[store-1.dmp] missing key[key5]
Store[store-1.dmp] write (key, value) = (key5, value5)
Store[store-2.dmp] write (key, value) = (key5, value5)
Store[store-1.dmp] try load key[key6], fetchValue[true], fetchMetadata[true]
Store[store-1.dmp] missing key[key6]
Store[store-2.dmp] write (key, value) = (key6, value6)
Store[store-1.dmp] write (key, value) = (key6, value6)
Store[store-1.dmp] try load key[key7], fetchValue[true], fetchMetadata[true]
Store[store-1.dmp] missing key[key7]
Store[store-2.dmp] write (key, value) = (key7, value7)
Store[store-1.dmp] write (key, value) = (key7, value7)
Store[store-1.dmp] try load key[key8], fetchValue[true], fetchMetadata[true]
Store[store-1.dmp] missing key[key8]
Store[store-2.dmp] write (key, value) = (key8, value8)
Store[store-1.dmp] write (key, value) = (key8, value8)
Store[store-1.dmp] try load key[key9], fetchValue[true], fetchMetadata[true]
Store[store-1.dmp] missing key[key9]
Store[store-1.dmp] write (key, value) = (key9, value9)
Store[store-2.dmp] write (key, value) = (key9, value9)
Store[store-1.dmp] try load key[key10], fetchValue[true], fetchMetadata[true]
Store[store-1.dmp] missing key[key10]
Store[store-1.dmp] write (key, value) = (key10, value10)
Store[store-2.dmp] write (key, value) = (key10, value10)
Store[store-1.dmp] saved, keys = Set(key6, key9, key8, key2, key5, key4, key7, key1, key10, key3)
Store[store-2.dmp] process, fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] saved, keys = Set(key6, key9, key8, key2, key5, key4, key7, key1, key10, key3)

起動時に、ちらちらprocessメソッドが呼び出されています。あ、sizeにも反応していますね。

で、2回目実行してみると、やたらといろいろ出るようになります。

> test
Store Name = store-1.dmp
Loaded From Store[store-1.dmp], keys = Set(key6, key9, key8, key2, key5, key4, key7, key1, key10, key3)
Store Name = store-2.dmp
Loaded From Store[store-2.dmp], keys = Set(key6, key9, key8, key2, key5, key4, key7, key1, key10, key3)
Store[store-1.dmp] process, fetchValue[false], fetchMetadata[false]
Store[store-1.dmp] try load key[key6], fetchValue[false], fetchMetadata[false]
Store[store-1.dmp] try load key[key9], fetchValue[false], fetchMetadata[false]
Store[store-1.dmp] try load key[key8], fetchValue[false], fetchMetadata[false]
Store[store-1.dmp] try load key[key5], fetchValue[false], fetchMetadata[false]
Store[store-1.dmp] try load key[key2], fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] process, fetchValue[false], fetchMetadata[false]
Store[store-1.dmp] loaded key[key6], on process
Store[store-1.dmp] loaded key[key2], on process
Store[store-1.dmp] loaded key[key5], on process
Store[store-1.dmp] loaded key[key9], on process
Store[store-1.dmp] loaded key[key8], on process
Store[store-2.dmp] try load key[key6], fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] try load key[key9], fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] try load key[key8], fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] try load key[key5], fetchValue[false], fetchMetadata[false]
Store[store-1.dmp] try load key[key3], fetchValue[false], fetchMetadata[false]
Store[store-1.dmp] loaded key[key3], on process
Store[store-2.dmp] loaded key[key6], on process
Store[store-2.dmp] loaded key[key9], on process
Store[store-2.dmp] loaded key[key5], on process
Store[store-2.dmp] try load key[key1], fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] loaded key[key1], on process
Store[store-2.dmp] try load key[key10], fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] loaded key[key10], on process
Store[store-2.dmp] try load key[key3], fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] loaded key[key3], on process
Store[store-2.dmp] try load key[key7], fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] loaded key[key7], on process
Store[store-2.dmp] try load key[key4], fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] loaded key[key4], on process
Store[store-2.dmp] loaded key[key8], on process
Store[store-1.dmp] try load key[key10], fetchValue[false], fetchMetadata[false]
Store[store-1.dmp] loaded key[key10], on process
Store[store-1.dmp] try load key[key1], fetchValue[false], fetchMetadata[false]
Store[store-1.dmp] loaded key[key1], on process
Store[store-1.dmp] try load key[key7], fetchValue[false], fetchMetadata[false]
Store[store-1.dmp] loaded key[key7], on process
Store[store-1.dmp] try load key[key4], fetchValue[false], fetchMetadata[false]
Store[store-1.dmp] loaded key[key4], on process
Store[store-2.dmp] try load key[key2], fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] loaded key[key2], on process
Store[store-1.dmp] process, fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] process, fetchValue[false], fetchMetadata[false]
Store[store-1.dmp] try load key[key5], fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] try load key[key5], fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] loaded key[key5], on process
Store[store-2.dmp] try load key[key4], fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] loaded key[key4], on process
Store[store-2.dmp] try load key[key7], fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] loaded key[key7], on process
Store[store-2.dmp] try load key[key1], fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] loaded key[key1], on process
Store[store-2.dmp] try load key[key10], fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] loaded key[key10], on process
Store[store-2.dmp] try load key[key3], fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] loaded key[key3], on process
Store[store-1.dmp] try load key[key6], fetchValue[false], fetchMetadata[false]
Store[store-1.dmp] loaded key[key6], on process
Store[store-1.dmp] try load key[key9], fetchValue[false], fetchMetadata[false]
Store[store-1.dmp] loaded key[key9], on process
Store[store-1.dmp] try load key[key8], fetchValue[false], fetchMetadata[false]
Store[store-1.dmp] loaded key[key8], on process
Store[store-1.dmp] try load key[key2], fetchValue[false], fetchMetadata[false]
Store[store-1.dmp] loaded key[key2], on process
Store[store-2.dmp] try load key[key6], fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] loaded key[key6], on process
Store[store-2.dmp] try load key[key9], fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] loaded key[key9], on process
Store[store-2.dmp] try load key[key8], fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] loaded key[key8], on process
Store[store-2.dmp] try load key[key2], fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] loaded key[key2], on process
Store[store-1.dmp] size
Cache size => 10
Store[store-1.dmp] try load key[key1], fetchValue[true], fetchMetadata[true]
key[key1] => value1
Store[store-1.dmp] loaded key[key5], on process
Store[store-1.dmp] try load key[key4], fetchValue[false], fetchMetadata[false]
Store[store-1.dmp] loaded key[key4], on process
Store[store-1.dmp] try load key[key10], fetchValue[false], fetchMetadata[false]
Store[store-1.dmp] loaded key[key10], on process
Store[store-1.dmp] try load key[key3], fetchValue[false], fetchMetadata[false]
Store[store-1.dmp] loaded key[key3], on process
Store[store-1.dmp] try load key[key7], fetchValue[false], fetchMetadata[false]
Store[store-1.dmp] loaded key[key7], on process
Store[store-1.dmp] try load key[key2], fetchValue[true], fetchMetadata[true]
key[key2] => value2
Store[store-1.dmp] try load key[key3], fetchValue[true], fetchMetadata[true]
key[key3] => value3
Store[store-1.dmp] try load key[key4], fetchValue[true], fetchMetadata[true]
key[key4] => value4
Store[store-1.dmp] try load key[key5], fetchValue[true], fetchMetadata[true]
key[key5] => value5
Store[store-1.dmp] try load key[key6], fetchValue[true], fetchMetadata[true]
key[key6] => value6
Store[store-1.dmp] try load key[key7], fetchValue[true], fetchMetadata[true]
key[key7] => value7
Store[store-1.dmp] try load key[key8], fetchValue[true], fetchMetadata[true]
key[key8] => value8
Store[store-1.dmp] try load key[key9], fetchValue[true], fetchMetadata[true]
key[key9] => value9
Store[store-1.dmp] try load key[key10], fetchValue[true], fetchMetadata[true]
key[key10] => value10
Store[store-2.dmp] write (key, value) = (key1, value1)
Store[store-1.dmp] write (key, value) = (key1, value1)
Store[store-2.dmp] write (key, value) = (key2, value2)
Store[store-1.dmp] write (key, value) = (key2, value2)
Store[store-2.dmp] write (key, value) = (key3, value3)
Store[store-1.dmp] write (key, value) = (key3, value3)
Store[store-2.dmp] write (key, value) = (key4, value4)
Store[store-1.dmp] write (key, value) = (key4, value4)
Store[store-1.dmp] try load key[key5], fetchValue[true], fetchMetadata[true]
Store[store-1.dmp] write (key, value) = (key5, value5)
Store[store-2.dmp] write (key, value) = (key5, value5)
Store[store-2.dmp] write (key, value) = (key6, value6)
Store[store-1.dmp] write (key, value) = (key6, value6)
Store[store-2.dmp] write (key, value) = (key7, value7)
Store[store-1.dmp] write (key, value) = (key7, value7)
Store[store-2.dmp] write (key, value) = (key8, value8)
Store[store-1.dmp] write (key, value) = (key8, value8)
Store[store-1.dmp] try load key[key9], fetchValue[true], fetchMetadata[true]
Store[store-1.dmp] write (key, value) = (key9, value9)
Store[store-2.dmp] write (key, value) = (key9, value9)
Store[store-1.dmp] try load key[key10], fetchValue[true], fetchMetadata[true]
Store[store-1.dmp] write (key, value) = (key10, value10)
Store[store-2.dmp] write (key, value) = (key10, value10)
Store[store-1.dmp] saved, keys = Set(key6, key9, key8, key2, key5, key4, key7, key1, key10, key3)
Store[store-2.dmp] process, fetchValue[false], fetchMetadata[false]
Store[store-2.dmp] saved, keys = Set(key6, key9, key8, key2, key5, key4, key7, key1, key10, key3)

最初に、processメソッドでキーの情報だけをロードしている模様で。

まあ、こうなるならstartメソッドでキー情報全部抜かなくても、processでもいい気がしますね。JdbcStoreは、そのような実装になっています。SingleFileStoreは、start時ですが。

ちなみに、前回は特に書いていませんでしたが、上記の設定だとこんなログが出ていました。

8 03, 2014 7:03:27 午後 org.infinispan.configuration.cache.AbstractStoreConfigurationBuilder validate
WARN: ISPN000149: Fetch persistent state and purge on startup are both disabled, cache may contain stale entries on startup
8 03, 2014 7:03:27 午後 org.infinispan.configuration.cache.AbstractStoreConfigurationBuilder validate
WARN: ISPN000149: Fetch persistent state and purge on startup are both disabled, cache may contain stale entries on startup
8 03, 2014 7:03:27 午後 org.infinispan.configuration.cache.AbstractStoreConfigurationBuilder validate
WARN: ISPN000149: Fetch persistent state and purge on startup are both disabled, cache may contain stale entries on startup
8 03, 2014 7:03:27 午後 org.infinispan.configuration.cache.AbstractStoreConfigurationBuilder validate
WARN: ISPN000149: Fetch persistent state and purge on startup are both disabled, cache may contain stale entries on startup

fetchPersistentStateとpurgeOnStartupの両方が無効になっているから、起動時に古いエントリを読む可能性があるよってことを言っているみたいです。

で、これを踏まえて残りの設定項目を見ていくと…。

purgeOnStartup

purgeOnStarupをtrueにすると、起動時にAdvancedCacheWriterのclearメソッドが呼び出され、CacheStoreを空にしようとします。

preload

preloadをtrueにすると、起動時にAdvancedCacheLoaderのprocessメソッドが、fetchValueおよびfetchMetadataがtrueで呼び出され、エントリの情報を値、メタデータ含めてロードしにいきます。この場合、Cache#getを使用した場合には、先の例のようにCache#get時にCacheLoader#loadメソッドは呼び出さなくなるようです。

起動時にあらかじめロードしておきたい場合は便利なようですが、Nodeにローカルに持ってしまうところが注意事項だとか。

また、片方のNode(のStore)でロードした後、もう片方のNode(のStore)に書き込みにいくような挙動も見せましたが、これは単純に書き込みイベントが発生したからでしょうか…。

fetchPersistentState

fetchPersistentStateをtrueにすると、Cacheを操作するまでにprocessメソッドがより多く呼び出されるような感じの挙動になりました。この時、fetchValueおよびfetchMetadataがtrueの呼び出しも入ります。最初は、falseで呼び出されますが。

ロード処理が入ることになるので、ちゃんと有効期限のチェックをloadで入れておけば、不要エントリの廃棄もここで可能なようです。
*これは、preloadでも可能な気がしますけど

でまあ、やっぱりよくわからなかったので、使われていると思われる箇所を探してみました。以下がそれっぽいですね。

OutboundTransferTask
https://github.com/infinispan/infinispan/blob/6.0.2.Final/core/src/main/java/org/infinispan/statetransfer/OutboundTransferTask.java#L149

クラスタ参加時に、参加したNodeの持つエントリの情報を送信しているようです。で、Storeにあったエントリが更新されていた場合は、その情報は遅れて起動した側のStoreへの書き込み操作が発生して取り込めるようです。

それに、クラスタに参加するまでに削除されたエントリについてはさすがに反映されない模様。

まあ、挙動を見ていると、起動時のprocessメソッド呼び出し後に、少なくとも更新エントリを含めて、現存するエントリ情報が渡ってくるようなので、場合によっては入れた方がいいのかも?fetchPersistentStateを設定しなくても更新があったものについては受け取れるようですが、更新がないものについては遅れて起動した側のStoreには書き込みが行われませんでしたから…。

それで

fetchPersistentStateは理解が怪しいところがありますが、なんとなく挙動はつかめた感じがします。

また、今回は2Nodeでやりましたが、4つとかに増やしたりすると裏で激しくprocessメソッドが呼び出されたりするので、まだいろいろあるんだろうなぁと…。

とりあえず、今回更新したソースは、こちらに反映しておきました。
https://github.com/kazuhira-r/infinispan-examples/tree/master/infinispan-persistence