前回、InfinispanのCacheLoaderとCacheWriterを実装して、簡単なCacheStoreを作ってみました。
InfinispanのCacheLoader/CacheWriterを使ってみる
http://d.hatena.ne.jp/Kazuhira/20140802/1406997368
こちらの続きです。
CacheLoader/CacheWriterの実装を用意しただけでは、fetchPersistentState、preload、purgeOnStartupは使用できませんでした。今回、AdvancedCacheLoader/AdvancedCacheWriterの実装を加えることで、これらを使えるようにしてみたいと思います。
ベースの実装は、前回のエントリに足していく形になるので、そちらを参照ください…。
AdvancedCacheLoader/AdvancedCacheWriterを実装する
では、前回作成したCacheStoreにAdvancedCacheLoaderおよびAdvancedCacheWriterインターフェースを実装するよう追加します。
今回、import文がこんな感じになりました。
import scala.collection._ import java.io._ import java.nio.file.{Files, Paths} import java.util.concurrent.{Callable, Executor} import org.infinispan.commons.io.ByteBuffer import org.infinispan.executors.ExecutorAllCompletionService import org.infinispan.marshall.core.MarshalledEntry import org.infinispan.metadata.InternalMetadata // import org.infinispan.persistence.spi.{CacheLoader, CacheWriter, ExternalStore, InitializationContext} import org.infinispan.persistence.spi.{AdvancedCacheLoader, AdvancedCacheWriter, AdvancedLoadWriteStore, InitializationContext, PersistenceException} import org.infinispan.persistence.{PersistenceUtil, TaskContextImpl}
クラスの宣言は、このような形に。
class SimpleMapCacheStore[K, V] extends AdvancedCacheLoader[K, V] with AdvancedCacheWriter[K, V] { // class SimpleMapCacheStore[K, V] extends AdvancedLoadWriteStore[K, V] { // こちらでも可
AdvancedCacheLoaderとAdvancedCacheWriterをそれぞれ実装してもよいですが、それらを合わせたAdvancedLoadWriteStoreインターフェースを実装するのもOKです。
AdvancedCacheLoaderインターフェースを実装したことにより、process、sizeメソッドを、AdvancedCacheWriterを実装したことにより、clear、purgeメソッドをそれぞれ実装する必要があります。
また、AdvancedCacheLoaderはCacheLoaderを、AdvancedCacheWriterはCacheWriterをぞれぞれ拡張したものなので、それらが持っていたメソッドも実装する必要があります。
あら、大変。なお、ConfigurationBuilderはそのまま使用します。
まずは、AdvancedCacheLoader側のメソッドから定義しましょう。
processメソッドですが、このように実装しました。
// from AdvancedCacheLoader override def process(filter: AdvancedCacheLoader.KeyFilter[K], task: AdvancedCacheLoader.CacheLoaderTask[K, V], executor: Executor, fetchValue: Boolean, fetchMetadata: Boolean): Unit = { println(s"Store[$storeName] process, fetchValue[$fetchValue], fetchMetadata[$fetchMetadata]") val filterOrLoadAll = PersistenceUtil.notNull(filter).asInstanceOf[AdvancedCacheLoader.KeyFilter[K]] val eacs = new ExecutorAllCompletionService(executor) val taskContext = new TaskContextImpl store .keys .withFilter { k => filterOrLoadAll.shouldLoadKey(k) } .foreach { k => if (!taskContext.isStopped) { eacs.submit(new Callable[Void] { @throws(classOf[Exception]) override def call(): Void = { val marshalledEntry = load(k, fetchValue, fetchMetadata) if (marshalledEntry != null) { println(s"Store[$storeName] loaded key[$k], on process") task.processEntry(marshalledEntry, taskContext) } null } }) } } eacs.waitUntilAllCompleted() if (eacs.isExceptionThrown()) { throw new PersistenceException("Execution exception!", eacs.getFirstException()); } }
processメソッドは、起動時やアクセス時など、けっこう随所で呼び出されます。今回は、SingleFileStoreの実装を参考にして作成しました。
引数で渡されたスレッドプールを使用してExecutorAllCompletionServiceを作成し、自Nodeに割り当てられたキーに対応する情報をロードします。
自NodeかどうかはAdvancedCacheLoader.KeyFilterで判定するのですが、こちらがnullの場合は全キーを対象にするようにPersistenceUtil.notNullで全キーを対象とするようなKeyFilterのインスタンスを受け取るようにしています。
fetchValueやfetchMetadataの引数は、Cache#getや起動時にCacheStoreのpreloadやfetchPersistentStateなどを有効にしておいたりするとtrueで呼び出されます。それ以外の時は、キーのロードのみ(Cacheのエントリ数の把握のため?)な感じでしょうか。
あと、余談としてCallableをVoidでパラメータ適用して、
eacs.submit(new Callable[Void] {
メソッドの最後でnullを返していますが、これはScalaのUnitをここに書けず…(ExecutorAllCompletionService#submitがCallable[Void]を要求しているため)、仕方なくこのような形になりました。
続いて、sizeメソッド。今回は、単純に内部で持っているMapのサイズを返却するようにしました。
// from AdvancedCacheLoader override def size: Int = { println(s"Store[$storeName] size") store.size }
続いて、AdvancedCacheWriter側のメソッド。
clearメソッドは、単純に内部で持っているMapをクリアするように実装しました。
// from AdvancedCacheWriter override def clear(): Unit = { println(s"Store[$storeName] clear") store.clear() }
このメソッドを実装することで、Cache#clearにStoreもついてくることができるようになります。
そして、purgeメソッド。
// from AdvancedCacheWriter override def purge(executor: Executor, listener: AdvancedCacheWriter.PurgeListener[_]): Unit = { println(s"Store[$storeName] purge") executor.execute(new Runnable { override def run(): Unit = { val now = System.currentTimeMillis val marshaller = ctx.getMarshaller val factory = ctx.getByteBufferFactory val marshalledEntryFactory = ctx.getMarshalledEntryFactory store.foreach { case (k, (v, m)) => val (kb, vb, mb) = (marshaller.objectToByteBuffer(k), marshaller.objectToByteBuffer(v), m) val (keyBuffer, valueBuffer, metaBuffer) = (factory.newByteBuffer(kb, 0, kb.size), factory.newByteBuffer(vb, 0, vb.size), factory.newByteBuffer(mb, 0, mb.size)) val marshalledEntry = marshalledEntryFactory .newMarshalledEntry(keyBuffer, valueBuffer, metaBuffer) if (marshalledEntry.getMetadata.isExpired(now)) { println(s"Store[$storeName] purge key[$k]") store -= k if (listener != null) { println(s"Store[$storeName] listner nofiticate, key[$k]") val task = listener.asInstanceOf[AdvancedCacheWriter.PurgeListener[K]] task.entryPurged(k) } } } } }) }
こちらは、有効期限切れのエントリを削除する場合に呼び出されます。expirationのwakeUpIntervalを短い値で調整すると、動作しているのを見ることができるでしょう。
こちらもExecutorを用いて実装し、エントリをパージした場合はAdvancedCacheWriter.PurgeListener#entryPurgedメソッドを呼び出して通知します。
また、このように有効期限切れを意識した関係上、CacheLoaderとして実装するloadメソッドの中身も、有効期限を見るように変更しておきました。
// from CacheLoader override def load(key: K): MarshalledEntry[K, V] = load(key, true, true) def load(key: K, fetchValue: Boolean, fetchMetadata: Boolean): MarshalledEntry[K, V] = { println(s"Store[$storeName] try load key[$key], fetchValue[$fetchValue], fetchMetadata[$fetchMetadata]") val value = store.get(key) val marshaller = ctx.getMarshaller val binaryKey = marshaller.objectToByteBuffer(key) val factory = ctx.getByteBufferFactory val keyBuffer = factory.newByteBuffer(binaryKey, 0, binaryKey.size) value match { case Some((v, m)) => val valueBuffer = if (fetchValue) { val binaryValue = marshaller.objectToByteBuffer(v) factory.newByteBuffer(binaryValue, 0, binaryValue.size) } else { null } val metaBuffer = if (fetchMetadata && m != null) factory.newByteBuffer(m, 0, m.size) else null val marshalledEntry = ctx .getMarshalledEntryFactory .newMarshalledEntry(keyBuffer, valueBuffer, metaBuffer) .asInstanceOf[MarshalledEntry[K, V]] val now = System.currentTimeMillis val metadata = marshalledEntry.getMetadata if (metadata != null && metadata.isExpired(now)) { println(s"Store[$storeName] loaded, but expire entry, key[$key]") store -= key null } else { marshalledEntry } case None => println(s"Store[$storeName] missing key[$key]") null } }
テストコード
では、動作確認を。前回のテストコードとそんなに変わらないので、断片だけ。
describe("simple map cache store spec") { it("persistence") { val numInstances = 2 SimpleMapCacheStore.instances(numInstances) withStoreCache[String, String]("storeCache", numInstances) { cache => println(s"Cache size => ${cache.size}") val range = 1 to 10 range.foreach(i => println(s"key[${s"key$i"}] => ${cache.get(s"key$i")}")) (1 to 10) foreach (i => cache.put(s"key$i", s"value$i")) /* println("Wait...") Thread.sleep(5000L) range.foreach(i => println(s"key[${s"key$i"}] => ${cache.get(s"key$i")}")) */ } } }
コメントアウトしている箇所は、有効期限切れの確認などに。
Cacheの設定は、こんな感じで。
val persistenceBuilder = new ConfigurationBuilder() .clustering .cacheMode(CacheMode.DIST_SYNC) .expiration // .maxIdle(3000L) // .wakeUpInterval(500L) .persistence manager.defineConfiguration(cacheName, persistenceBuilder .addStore(new SimpleMapCacheStoreConfigurationBuilder(persistenceBuilder)) .fetchPersistentState(false) .preload(false) .shared(false) .purgeOnStartup(false) .ignoreModifications(false) .build)
コメントアウトしている箇所は、やっぱり有効期限切れの確認に利用。
とりあえず、ほとんどの設定項目をfalseにしていますが、1度動かしてみます。永続化したエントリは存在しない、空っぽの状態で。
> test Store Name = store-1.dmp Store[store-1.dmp], not exists. Store Name = store-2.dmp Store[store-2.dmp], not exists. Store[store-1.dmp] process, fetchValue[false], fetchMetadata[false] Store[store-2.dmp] process, fetchValue[false], fetchMetadata[false] Store[store-1.dmp] process, fetchValue[false], fetchMetadata[false] Store[store-2.dmp] process, fetchValue[false], fetchMetadata[false] Store[store-1.dmp] size Cache size => 0 Store[store-1.dmp] try load key[key1], fetchValue[true], fetchMetadata[true] Store[store-1.dmp] missing key[key1] key[key1] => null Store[store-1.dmp] try load key[key2], fetchValue[true], fetchMetadata[true] Store[store-1.dmp] missing key[key2] key[key2] => null Store[store-1.dmp] try load key[key3], fetchValue[true], fetchMetadata[true] Store[store-1.dmp] missing key[key3] key[key3] => null Store[store-1.dmp] try load key[key4], fetchValue[true], fetchMetadata[true] Store[store-1.dmp] missing key[key4] key[key4] => null Store[store-1.dmp] try load key[key5], fetchValue[true], fetchMetadata[true] Store[store-1.dmp] missing key[key5] key[key5] => null Store[store-1.dmp] try load key[key6], fetchValue[true], fetchMetadata[true] Store[store-1.dmp] missing key[key6] key[key6] => null Store[store-1.dmp] try load key[key7], fetchValue[true], fetchMetadata[true] Store[store-1.dmp] missing key[key7] key[key7] => null Store[store-1.dmp] try load key[key8], fetchValue[true], fetchMetadata[true] Store[store-1.dmp] missing key[key8] key[key8] => null Store[store-1.dmp] try load key[key9], fetchValue[true], fetchMetadata[true] Store[store-1.dmp] missing key[key9] key[key9] => null Store[store-1.dmp] try load key[key10], fetchValue[true], fetchMetadata[true] Store[store-1.dmp] missing key[key10] key[key10] => null Store[store-1.dmp] try load key[key1], fetchValue[true], fetchMetadata[true] Store[store-1.dmp] missing key[key1] Store[store-2.dmp] write (key, value) = (key1, value1) Store[store-1.dmp] write (key, value) = (key1, value1) Store[store-1.dmp] try load key[key2], fetchValue[true], fetchMetadata[true] Store[store-1.dmp] missing key[key2] Store[store-2.dmp] write (key, value) = (key2, value2) Store[store-1.dmp] write (key, value) = (key2, value2) Store[store-1.dmp] try load key[key3], fetchValue[true], fetchMetadata[true] Store[store-1.dmp] missing key[key3] Store[store-2.dmp] write (key, value) = (key3, value3) Store[store-1.dmp] write (key, value) = (key3, value3) Store[store-1.dmp] try load key[key4], fetchValue[true], fetchMetadata[true] Store[store-1.dmp] missing key[key4] Store[store-2.dmp] write (key, value) = (key4, value4) Store[store-1.dmp] write (key, value) = (key4, value4) Store[store-1.dmp] try load key[key5], fetchValue[true], fetchMetadata[true] Store[store-1.dmp] missing key[key5] Store[store-1.dmp] write (key, value) = (key5, value5) Store[store-2.dmp] write (key, value) = (key5, value5) Store[store-1.dmp] try load key[key6], fetchValue[true], fetchMetadata[true] Store[store-1.dmp] missing key[key6] Store[store-2.dmp] write (key, value) = (key6, value6) Store[store-1.dmp] write (key, value) = (key6, value6) Store[store-1.dmp] try load key[key7], fetchValue[true], fetchMetadata[true] Store[store-1.dmp] missing key[key7] Store[store-2.dmp] write (key, value) = (key7, value7) Store[store-1.dmp] write (key, value) = (key7, value7) Store[store-1.dmp] try load key[key8], fetchValue[true], fetchMetadata[true] Store[store-1.dmp] missing key[key8] Store[store-2.dmp] write (key, value) = (key8, value8) Store[store-1.dmp] write (key, value) = (key8, value8) Store[store-1.dmp] try load key[key9], fetchValue[true], fetchMetadata[true] Store[store-1.dmp] missing key[key9] Store[store-1.dmp] write (key, value) = (key9, value9) Store[store-2.dmp] write (key, value) = (key9, value9) Store[store-1.dmp] try load key[key10], fetchValue[true], fetchMetadata[true] Store[store-1.dmp] missing key[key10] Store[store-1.dmp] write (key, value) = (key10, value10) Store[store-2.dmp] write (key, value) = (key10, value10) Store[store-1.dmp] saved, keys = Set(key6, key9, key8, key2, key5, key4, key7, key1, key10, key3) Store[store-2.dmp] process, fetchValue[false], fetchMetadata[false] Store[store-2.dmp] saved, keys = Set(key6, key9, key8, key2, key5, key4, key7, key1, key10, key3)
起動時に、ちらちらprocessメソッドが呼び出されています。あ、sizeにも反応していますね。
で、2回目実行してみると、やたらといろいろ出るようになります。
> test Store Name = store-1.dmp Loaded From Store[store-1.dmp], keys = Set(key6, key9, key8, key2, key5, key4, key7, key1, key10, key3) Store Name = store-2.dmp Loaded From Store[store-2.dmp], keys = Set(key6, key9, key8, key2, key5, key4, key7, key1, key10, key3) Store[store-1.dmp] process, fetchValue[false], fetchMetadata[false] Store[store-1.dmp] try load key[key6], fetchValue[false], fetchMetadata[false] Store[store-1.dmp] try load key[key9], fetchValue[false], fetchMetadata[false] Store[store-1.dmp] try load key[key8], fetchValue[false], fetchMetadata[false] Store[store-1.dmp] try load key[key5], fetchValue[false], fetchMetadata[false] Store[store-1.dmp] try load key[key2], fetchValue[false], fetchMetadata[false] Store[store-2.dmp] process, fetchValue[false], fetchMetadata[false] Store[store-1.dmp] loaded key[key6], on process Store[store-1.dmp] loaded key[key2], on process Store[store-1.dmp] loaded key[key5], on process Store[store-1.dmp] loaded key[key9], on process Store[store-1.dmp] loaded key[key8], on process Store[store-2.dmp] try load key[key6], fetchValue[false], fetchMetadata[false] Store[store-2.dmp] try load key[key9], fetchValue[false], fetchMetadata[false] Store[store-2.dmp] try load key[key8], fetchValue[false], fetchMetadata[false] Store[store-2.dmp] try load key[key5], fetchValue[false], fetchMetadata[false] Store[store-1.dmp] try load key[key3], fetchValue[false], fetchMetadata[false] Store[store-1.dmp] loaded key[key3], on process Store[store-2.dmp] loaded key[key6], on process Store[store-2.dmp] loaded key[key9], on process Store[store-2.dmp] loaded key[key5], on process Store[store-2.dmp] try load key[key1], fetchValue[false], fetchMetadata[false] Store[store-2.dmp] loaded key[key1], on process Store[store-2.dmp] try load key[key10], fetchValue[false], fetchMetadata[false] Store[store-2.dmp] loaded key[key10], on process Store[store-2.dmp] try load key[key3], fetchValue[false], fetchMetadata[false] Store[store-2.dmp] loaded key[key3], on process Store[store-2.dmp] try load key[key7], fetchValue[false], fetchMetadata[false] Store[store-2.dmp] loaded key[key7], on process Store[store-2.dmp] try load key[key4], fetchValue[false], fetchMetadata[false] Store[store-2.dmp] loaded key[key4], on process Store[store-2.dmp] loaded key[key8], on process Store[store-1.dmp] try load key[key10], fetchValue[false], fetchMetadata[false] Store[store-1.dmp] loaded key[key10], on process Store[store-1.dmp] try load key[key1], fetchValue[false], fetchMetadata[false] Store[store-1.dmp] loaded key[key1], on process Store[store-1.dmp] try load key[key7], fetchValue[false], fetchMetadata[false] Store[store-1.dmp] loaded key[key7], on process Store[store-1.dmp] try load key[key4], fetchValue[false], fetchMetadata[false] Store[store-1.dmp] loaded key[key4], on process Store[store-2.dmp] try load key[key2], fetchValue[false], fetchMetadata[false] Store[store-2.dmp] loaded key[key2], on process Store[store-1.dmp] process, fetchValue[false], fetchMetadata[false] Store[store-2.dmp] process, fetchValue[false], fetchMetadata[false] Store[store-1.dmp] try load key[key5], fetchValue[false], fetchMetadata[false] Store[store-2.dmp] try load key[key5], fetchValue[false], fetchMetadata[false] Store[store-2.dmp] loaded key[key5], on process Store[store-2.dmp] try load key[key4], fetchValue[false], fetchMetadata[false] Store[store-2.dmp] loaded key[key4], on process Store[store-2.dmp] try load key[key7], fetchValue[false], fetchMetadata[false] Store[store-2.dmp] loaded key[key7], on process Store[store-2.dmp] try load key[key1], fetchValue[false], fetchMetadata[false] Store[store-2.dmp] loaded key[key1], on process Store[store-2.dmp] try load key[key10], fetchValue[false], fetchMetadata[false] Store[store-2.dmp] loaded key[key10], on process Store[store-2.dmp] try load key[key3], fetchValue[false], fetchMetadata[false] Store[store-2.dmp] loaded key[key3], on process Store[store-1.dmp] try load key[key6], fetchValue[false], fetchMetadata[false] Store[store-1.dmp] loaded key[key6], on process Store[store-1.dmp] try load key[key9], fetchValue[false], fetchMetadata[false] Store[store-1.dmp] loaded key[key9], on process Store[store-1.dmp] try load key[key8], fetchValue[false], fetchMetadata[false] Store[store-1.dmp] loaded key[key8], on process Store[store-1.dmp] try load key[key2], fetchValue[false], fetchMetadata[false] Store[store-1.dmp] loaded key[key2], on process Store[store-2.dmp] try load key[key6], fetchValue[false], fetchMetadata[false] Store[store-2.dmp] loaded key[key6], on process Store[store-2.dmp] try load key[key9], fetchValue[false], fetchMetadata[false] Store[store-2.dmp] loaded key[key9], on process Store[store-2.dmp] try load key[key8], fetchValue[false], fetchMetadata[false] Store[store-2.dmp] loaded key[key8], on process Store[store-2.dmp] try load key[key2], fetchValue[false], fetchMetadata[false] Store[store-2.dmp] loaded key[key2], on process Store[store-1.dmp] size Cache size => 10 Store[store-1.dmp] try load key[key1], fetchValue[true], fetchMetadata[true] key[key1] => value1 Store[store-1.dmp] loaded key[key5], on process Store[store-1.dmp] try load key[key4], fetchValue[false], fetchMetadata[false] Store[store-1.dmp] loaded key[key4], on process Store[store-1.dmp] try load key[key10], fetchValue[false], fetchMetadata[false] Store[store-1.dmp] loaded key[key10], on process Store[store-1.dmp] try load key[key3], fetchValue[false], fetchMetadata[false] Store[store-1.dmp] loaded key[key3], on process Store[store-1.dmp] try load key[key7], fetchValue[false], fetchMetadata[false] Store[store-1.dmp] loaded key[key7], on process Store[store-1.dmp] try load key[key2], fetchValue[true], fetchMetadata[true] key[key2] => value2 Store[store-1.dmp] try load key[key3], fetchValue[true], fetchMetadata[true] key[key3] => value3 Store[store-1.dmp] try load key[key4], fetchValue[true], fetchMetadata[true] key[key4] => value4 Store[store-1.dmp] try load key[key5], fetchValue[true], fetchMetadata[true] key[key5] => value5 Store[store-1.dmp] try load key[key6], fetchValue[true], fetchMetadata[true] key[key6] => value6 Store[store-1.dmp] try load key[key7], fetchValue[true], fetchMetadata[true] key[key7] => value7 Store[store-1.dmp] try load key[key8], fetchValue[true], fetchMetadata[true] key[key8] => value8 Store[store-1.dmp] try load key[key9], fetchValue[true], fetchMetadata[true] key[key9] => value9 Store[store-1.dmp] try load key[key10], fetchValue[true], fetchMetadata[true] key[key10] => value10 Store[store-2.dmp] write (key, value) = (key1, value1) Store[store-1.dmp] write (key, value) = (key1, value1) Store[store-2.dmp] write (key, value) = (key2, value2) Store[store-1.dmp] write (key, value) = (key2, value2) Store[store-2.dmp] write (key, value) = (key3, value3) Store[store-1.dmp] write (key, value) = (key3, value3) Store[store-2.dmp] write (key, value) = (key4, value4) Store[store-1.dmp] write (key, value) = (key4, value4) Store[store-1.dmp] try load key[key5], fetchValue[true], fetchMetadata[true] Store[store-1.dmp] write (key, value) = (key5, value5) Store[store-2.dmp] write (key, value) = (key5, value5) Store[store-2.dmp] write (key, value) = (key6, value6) Store[store-1.dmp] write (key, value) = (key6, value6) Store[store-2.dmp] write (key, value) = (key7, value7) Store[store-1.dmp] write (key, value) = (key7, value7) Store[store-2.dmp] write (key, value) = (key8, value8) Store[store-1.dmp] write (key, value) = (key8, value8) Store[store-1.dmp] try load key[key9], fetchValue[true], fetchMetadata[true] Store[store-1.dmp] write (key, value) = (key9, value9) Store[store-2.dmp] write (key, value) = (key9, value9) Store[store-1.dmp] try load key[key10], fetchValue[true], fetchMetadata[true] Store[store-1.dmp] write (key, value) = (key10, value10) Store[store-2.dmp] write (key, value) = (key10, value10) Store[store-1.dmp] saved, keys = Set(key6, key9, key8, key2, key5, key4, key7, key1, key10, key3) Store[store-2.dmp] process, fetchValue[false], fetchMetadata[false] Store[store-2.dmp] saved, keys = Set(key6, key9, key8, key2, key5, key4, key7, key1, key10, key3)
最初に、processメソッドでキーの情報だけをロードしている模様で。
まあ、こうなるならstartメソッドでキー情報全部抜かなくても、processでもいい気がしますね。JdbcStoreは、そのような実装になっています。SingleFileStoreは、start時ですが。
ちなみに、前回は特に書いていませんでしたが、上記の設定だとこんなログが出ていました。
8 03, 2014 7:03:27 午後 org.infinispan.configuration.cache.AbstractStoreConfigurationBuilder validate WARN: ISPN000149: Fetch persistent state and purge on startup are both disabled, cache may contain stale entries on startup 8 03, 2014 7:03:27 午後 org.infinispan.configuration.cache.AbstractStoreConfigurationBuilder validate WARN: ISPN000149: Fetch persistent state and purge on startup are both disabled, cache may contain stale entries on startup 8 03, 2014 7:03:27 午後 org.infinispan.configuration.cache.AbstractStoreConfigurationBuilder validate WARN: ISPN000149: Fetch persistent state and purge on startup are both disabled, cache may contain stale entries on startup 8 03, 2014 7:03:27 午後 org.infinispan.configuration.cache.AbstractStoreConfigurationBuilder validate WARN: ISPN000149: Fetch persistent state and purge on startup are both disabled, cache may contain stale entries on startup
fetchPersistentStateとpurgeOnStartupの両方が無効になっているから、起動時に古いエントリを読む可能性があるよってことを言っているみたいです。
で、これを踏まえて残りの設定項目を見ていくと…。
purgeOnStartup
purgeOnStarupをtrueにすると、起動時にAdvancedCacheWriterのclearメソッドが呼び出され、CacheStoreを空にしようとします。
preload
preloadをtrueにすると、起動時にAdvancedCacheLoaderのprocessメソッドが、fetchValueおよびfetchMetadataがtrueで呼び出され、エントリの情報を値、メタデータ含めてロードしにいきます。この場合、Cache#getを使用した場合には、先の例のようにCache#get時にCacheLoader#loadメソッドは呼び出さなくなるようです。
起動時にあらかじめロードしておきたい場合は便利なようですが、Nodeにローカルに持ってしまうところが注意事項だとか。
また、片方のNode(のStore)でロードした後、もう片方のNode(のStore)に書き込みにいくような挙動も見せましたが、これは単純に書き込みイベントが発生したからでしょうか…。
fetchPersistentState
fetchPersistentStateをtrueにすると、Cacheを操作するまでにprocessメソッドがより多く呼び出されるような感じの挙動になりました。この時、fetchValueおよびfetchMetadataがtrueの呼び出しも入ります。最初は、falseで呼び出されますが。
ロード処理が入ることになるので、ちゃんと有効期限のチェックをloadで入れておけば、不要エントリの廃棄もここで可能なようです。
*これは、preloadでも可能な気がしますけど
でまあ、やっぱりよくわからなかったので、使われていると思われる箇所を探してみました。以下がそれっぽいですね。
OutboundTransferTask
https://github.com/infinispan/infinispan/blob/6.0.2.Final/core/src/main/java/org/infinispan/statetransfer/OutboundTransferTask.java#L149
クラスタ参加時に、参加したNodeの持つエントリの情報を送信しているようです。で、Storeにあったエントリが更新されていた場合は、その情報は遅れて起動した側のStoreへの書き込み操作が発生して取り込めるようです。
それに、クラスタに参加するまでに削除されたエントリについてはさすがに反映されない模様。
まあ、挙動を見ていると、起動時のprocessメソッド呼び出し後に、少なくとも更新エントリを含めて、現存するエントリ情報が渡ってくるようなので、場合によっては入れた方がいいのかも?fetchPersistentStateを設定しなくても更新があったものについては受け取れるようですが、更新がないものについては遅れて起動した側のStoreには書き込みが行われませんでしたから…。
それで
fetchPersistentStateは理解が怪しいところがありますが、なんとなく挙動はつかめた感じがします。
また、今回は2Nodeでやりましたが、4つとかに増やしたりすると裏で激しくprocessメソッドが呼び出されたりするので、まだいろいろあるんだろうなぁと…。
とりあえず、今回更新したソースは、こちらに反映しておきました。
https://github.com/kazuhira-r/infinispan-examples/tree/master/infinispan-persistence