Cacheに対するputやgetなどの各種操作を実行した時の裏舞台を少し知りたいと思い、その内部を少し追ってみることにしました。
Cacheに対するputやgetは、Commandというものに委譲して行われるようです。
CacheImpl#putより。
https://github.com/infinispan/infinispan/blob/8.2.2.Final/core/src/main/java/org/infinispan/cache/impl/CacheImpl.java#L1106-L1122
@SuppressWarnings("unchecked") final V put(K key, V value, Metadata metadata, EnumSet<Flag> explicitFlags, ClassLoader explicitClassLoader) { assertKeyValueNotNull(key, value); InvocationContext ctx = getInvocationContextWithImplicitTransaction(false, explicitClassLoader, 1); return putInternal(key, value, metadata, explicitFlags, ctx); } @SuppressWarnings("unchecked") private V putInternal(K key, V value, Metadata metadata, EnumSet<Flag> explicitFlags, InvocationContext ctx) { Set<Flag> flags = addUnsafeFlags(explicitFlags); Metadata merged = applyDefaultMetadata(metadata); PutKeyValueCommand command = commandsFactory.buildPutKeyValueCommand(key, value, merged, flags); ctx.setLockOwner(command.getKeyLockOwner()); return (V) executeCommandAndCommitIfNeeded(ctx, command); } // 少し離れて private Object executeCommandAndCommitIfNeeded(InvocationContext ctx, VisitableCommand command) { final boolean txInjected = isTxInjected(ctx); Object result; try { result = invoker.invoke(ctx, command); } catch (RuntimeException e) { if (txInjected) tryRollback(); throw e; } if (txInjected) { tryCommit(); } return result; }
CacheImpl#getより。
https://github.com/infinispan/infinispan/blob/8.2.2.Final/core/src/main/java/org/infinispan/cache/impl/CacheImpl.java#L406-L412
@SuppressWarnings("unchecked") final V get(Object key, EnumSet<Flag> explicitFlags, ClassLoader explicitClassLoader) { assertKeyNotNull(key); InvocationContext ctx = getInvocationContextForRead(explicitClassLoader, 1); GetKeyValueCommand command = commandsFactory.buildGetKeyValueCommand(key, explicitFlags); return (V) invoker.invoke(ctx, command); }
ここで、invokerというのはInfinispanが内部で持つInterceptorが登録されたものです。
protected InterceptorChain invoker;
Cacheの構成状態でCommandが呼び出されるまでのInterceptorの組み合わせが変わるようなのですが、今回はここを追ってみたいと思います。
準備
まずは、ビルド定義から。
build.sbt
name := "embedded-print-interceptors" version := "0.0.1-SNAPSHOT" organization := "org.littlewings" scalaVersion := "2.11.8" updateOptions := updateOptions.value.withCachedResolution(true) scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature") javaOptions in Test += "-javaagent:" + System.getenv("BYTEMAN_HOME") + "/lib/byteman.jar=script:trace.btm" fork in Test := true parallelExecution in Test := false libraryDependencies ++= Seq( "org.infinispan" % "infinispan-core" % "8.2.2.Final", "net.jcip" % "jcip-annotations" % "1.0" % "provided", "org.scalatest" %% "scalatest" % "2.2.6" % "test" )
なぜかBytemanの設定が入っていますが、そのあたりは後ほど。
Infinispanの設定ファイルは、以下のもので用意しました。
src/test/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:8.2 http://www.infinispan.org/schemas/infinispan-config-8.2.xsd" xmlns="urn:infinispan:config:8.2"> <jgroups> <stack-file name="udp" path="jgroups.xml"/> </jgroups> <cache-container default-cache="localCache"> <jmx duplicate-domains="true"/> <transport cluster="cluster" stack="udp"/> <local-cache name="localCache"/> <replicated-cache name="replicatedCache"/> <distributed-cache name="distributedCache"/> <invalidation-cache name="invalidationCache"/> <local-cache name="localTxCache"> <transaction mode="NON_XA"/> </local-cache> <replicated-cache name="replicatedTxCache"> <transaction mode="NON_XA"/> </replicated-cache> <distributed-cache name="distributedTxCache"> <transaction mode="NON_XA"/> </distributed-cache> <invalidation-cache name="invalidationTxCache"> <transaction mode="NON_XA"/> </invalidation-cache> <distributed-cache name="distributedL1Cache" l1-lifespan="1000"/> <distributed-cache name="distributedL1TxCache" l1-lifespan="1000"> <transaction mode="NON_XA"/> </distributed-cache> </cache-container> </infinispan>
JGroupsの設定は、端折ります。
Local Cache、Replicated Cache、Distributed Cache、Invalidation Cache、それぞれのトランザクションに対応したCacheを用意しました。
追記)
ただ、今回はクラスタを構成せずにやったので、あくまでLocal Nodeでの挙動しか見れない感じになっています…。あとで気付きました。
また、Distributed CacheのみL1 Cacheを有効にしたパターンも用意しています。
これらを使って、Cacheの種類や設定からInterceptorがどのように変わっていくか確認してみましょう。
InterceptorとCommand
Custom Interceptorを扱った時にも少し見たのですが、
InfinispanのCustom Interceptorを実装する - CLOVER
InterceptorはVisitorパターンで構成されており、visit◯×みたいなメソッドを多く持っています。
種類でいくと、ざっとこれだけです(8.2.2.Final)。
- visitPutKeyValueCommand
- visitRemoveCommand
- visitReplaceCommand
- visitClearCommand
- visitPutMapCommand
- visitEvictCommand
- visitApplyDeltaCommand
- visitSizeCommand
- visitGetKeyValueCommand
- visitGetCacheEntryCommand
- visitGetAllCommand
- visitKeySetCommand
- visitEntrySetCommand
- visitPrepareCommand
- visitRollbackCommand
- visitCommitCommand
- visitInvalidateCommand
- visitInvalidateL1Command
- visitLockControlCommand
- visitUnknownCommand
- visitDistributedExecuteCommand
- visitGetKeysInGroupCommand
- visitReadOnlyKeyCommand
- visitReadOnlyManyCommand
- visitWriteOnlyKeyCommand
- visitReadWriteKeyValueCommand
- visitReadWriteKeyCommand
- visitWriteOnlyManyEntriesCommand
- visitWriteOnlyKeyValueCommand
- visitWriteOnlyManyCommand
- visitReadWriteManyCommand
- visitReadWriteManyEntriesCommand
そして、チェーンされたInterceptorの最後にCommand(ReplicableCommand)が呼び出されるようになっています。
で、今回はCacheの種類や構成でどのようなInterceptorが組み上げられていくかを見てみるわけです。
どのようなInterceptorが組み込まれるかは、InterceptorChainFactory#buildInterceptorChainで決められるようです。
それでは、テストコードを書きつつ確認してみましょう。
テストコードの雛形
今回作成したテストコードの雛形は、こちら。
src/test/scala/org/littlewings/infinispan/interceptors/PrintInterceptorsSpec.scala
package org.littlewings.infinispan.interceptors import org.infinispan.Cache import org.infinispan.interceptors.distribution._ import org.infinispan.interceptors.locking.{NonTransactionalLockingInterceptor, OptimisticLockingInterceptor} import org.infinispan.interceptors._ import org.infinispan.manager.DefaultCacheManager import org.infinispan.statetransfer.{StateTransferInterceptor, TransactionSynchronizerInterceptor} import org.scalatest.{FunSpec, Matchers} import scala.collection.JavaConverters._ class PrintInterceptorsSpec extends FunSpec with Matchers { describe("Print Interceptors Spec") { // ここに、テストを書く! } protected def withCache[K, V](cacheName: String, numInstances: Int = 1)(fun: Cache[K, V] => Unit): Unit = { val managers = (1 to numInstances).map(_ => new DefaultCacheManager("infinispan.xml")) val cache = managers(0).getCache[K, V](cacheName) try { fun(cache) } finally { managers.foreach(_.getCache[K, V](cacheName).stop()) managers.foreach(_.stop()) } } }
Cacheを利用する簡易メソッド付き。今回は、クラスタは組みませんでしたが…。
Local Cache
それでは、Local Cacheから見てみます。どのようなInterceptorが利用されているかを確認するには、AdvancedCacheのgetInterceptorChainメソッドを利用すればOKです。
というわけで、ほぼデフォルトのLocal Cacheで使われているInterceptorを見ると、こんな感じになっていました。
it("local-cache") { withCache[String, String]("localCache") { cache => cache.getAdvancedCache.getInterceptorChain.asScala.map(_.getClass) should contain theSameElementsInOrderAs ( Seq( classOf[InvocationContextInterceptor], classOf[CacheMgmtInterceptor], classOf[NonTransactionalLockingInterceptor], classOf[EntryWrappingInterceptor], classOf[CallInterceptor] ) ) } }
InvocationContextInterceptorを先頭に、最後にCallInterceptorが来るといった構成になっています。
他のCacheもどんどん見ていきましょう。
Replicated Cache/Distributed Cache
Replicated CacheとDistributed Cacheは、構成するInterceptorが同じになるようです。
it("replicated-cache") { withCache[String, String]("replicatedCache") { cache => cache.getAdvancedCache.getInterceptorChain.asScala.map(_.getClass) should contain theSameElementsInOrderAs ( Seq( classOf[DistributionBulkInterceptor[_, _]], classOf[InvocationContextInterceptor], classOf[CacheMgmtInterceptor], classOf[StateTransferInterceptor], classOf[NonTransactionalLockingInterceptor], classOf[EntryWrappingInterceptor], classOf[NonTxDistributionInterceptor], classOf[CallInterceptor] ) ) } } it("distributed-cache") { withCache[String, String]("distributedCache") { cache => cache.getAdvancedCache.getInterceptorChain.asScala.map(_.getClass) should contain theSameElementsInOrderAs ( Seq( classOf[DistributionBulkInterceptor[_, _]], classOf[InvocationContextInterceptor], classOf[CacheMgmtInterceptor], classOf[StateTransferInterceptor], classOf[NonTransactionalLockingInterceptor], classOf[EntryWrappingInterceptor], classOf[NonTxDistributionInterceptor], classOf[CallInterceptor] ) ) } }
Local Cacheの時と比べるとDistributionBulkInterceptorが先頭に来て、StateTransferInterceptor、NonTxDistributionInterceptorが追加されます。
Invalidation Cache
Invalidation Cacheの場合はLocal Cacheにとても近いですが、InvalidationInterceptorが追加されます。
it("invalidation-cache") { withCache[String, String]("invalidationCache") { cache => cache.getAdvancedCache.getInterceptorChain.asScala.map(_.getClass) should contain theSameElementsInOrderAs ( Seq( classOf[InvocationContextInterceptor], classOf[CacheMgmtInterceptor], classOf[NonTransactionalLockingInterceptor], classOf[EntryWrappingInterceptor], classOf[InvalidationInterceptor], classOf[CallInterceptor] ) ) } }
トランザクションを有効にしてみる
次に、各Cacheでトランザクションを有効にしてみましょう。デフォルト状態の各種Cacheに、NON_XAな設定を追加してみます。
結果は、このように。
Local Cache
TxInterceptor、OptimisticLockingInterceptor、NotificationInterceptorが増えています。
it("transactional local-cache") { withCache[String, String]("localTxCache") { cache => cache.getAdvancedCache.getInterceptorChain.asScala.map(_.getClass) should contain theSameElementsInOrderAs ( Seq( classOf[InvocationContextInterceptor], classOf[CacheMgmtInterceptor], classOf[TxInterceptor[_, _]], classOf[OptimisticLockingInterceptor], classOf[NotificationInterceptor], classOf[EntryWrappingInterceptor], classOf[CallInterceptor] ) ) } }
Replicated Cache/Distributed Cache
Replicated Cache/Distributed Cacheの場合は、TransactionSynchronizerInterceptor、TxInterceptor、NotificationInterceptorが追加され、一部のInterceptorがOptimisticLockingInterceptor、TxDistributionInterceptorに変更されます。
it("transactional replicated-cache") { withCache[String, String]("replicatedTxCache") { cache => cache.getAdvancedCache.getInterceptorChain.asScala.map(_.getClass) should contain theSameElementsInOrderAs ( Seq( classOf[DistributionBulkInterceptor[_, _]], classOf[InvocationContextInterceptor], classOf[CacheMgmtInterceptor], classOf[StateTransferInterceptor], classOf[TransactionSynchronizerInterceptor], classOf[TxInterceptor[_, _]], classOf[OptimisticLockingInterceptor], classOf[NotificationInterceptor], classOf[EntryWrappingInterceptor], classOf[TxDistributionInterceptor], classOf[CallInterceptor] ) ) } } it("transactional distributed-cache") { withCache[String, String]("distributedTxCache") { cache => cache.getAdvancedCache.getInterceptorChain.asScala.map(_.getClass) should contain theSameElementsInOrderAs ( Seq( classOf[DistributionBulkInterceptor[_, _]], classOf[InvocationContextInterceptor], classOf[CacheMgmtInterceptor], classOf[StateTransferInterceptor], classOf[TransactionSynchronizerInterceptor], classOf[TxInterceptor[_, _]], classOf[OptimisticLockingInterceptor], classOf[NotificationInterceptor], classOf[EntryWrappingInterceptor], classOf[TxDistributionInterceptor], classOf[CallInterceptor] ) ) } }
Invalidation Cache
Invalidation Cacheの場合は、TxInterceptor、NotificationInterceptorが追加され、一部のInterceptorがOptimisticLockingInterceptorになります。
it("transactional invalidation-cache") { withCache[String, String]("invalidationTxCache") { cache => cache.getAdvancedCache.getInterceptorChain.asScala.map(_.getClass) should contain theSameElementsInOrderAs ( Seq( classOf[InvocationContextInterceptor], classOf[CacheMgmtInterceptor], classOf[TxInterceptor[_, _]], classOf[OptimisticLockingInterceptor], classOf[NotificationInterceptor], classOf[EntryWrappingInterceptor], classOf[InvalidationInterceptor], classOf[CallInterceptor] ) ) } }
とりあえず今回は単純にNON_XAにしただけですが、ロック戦略を変えた場合はOptimisticLockingInterceptorからPessimisticLockingInterceptorになるでしょうし、その他の設定でもいろいろ変わるでしょう。
Distributed Cache+L1 Cache
あと、構成のバリエーションとして、Distributed CacheにL1 Cacheを付けてみましょう。Non TransactionalなCache、TransactionalなCacheそれぞれで試してみました。
it("l1 enabled, distributed-cache") { withCache[String, String]("distributedL1Cache") { cache => cache.getAdvancedCache.getInterceptorChain.asScala.map(_.getClass) should contain theSameElementsInOrderAs ( Seq( classOf[DistributionBulkInterceptor[_, _]], classOf[InvocationContextInterceptor], classOf[CacheMgmtInterceptor], classOf[StateTransferInterceptor], classOf[NonTransactionalLockingInterceptor], classOf[L1LastChanceInterceptor], classOf[EntryWrappingInterceptor], classOf[L1NonTxInterceptor], classOf[NonTxDistributionInterceptor], classOf[CallInterceptor] ) ) } } it("l1 enabled, transactional distributed-cache") { withCache[String, String]("distributedL1TxCache") { cache => cache.getAdvancedCache.getInterceptorChain.asScala.map(_.getClass) should contain theSameElementsInOrderAs ( Seq( classOf[DistributionBulkInterceptor[_, _]], classOf[InvocationContextInterceptor], classOf[CacheMgmtInterceptor], classOf[StateTransferInterceptor], classOf[TransactionSynchronizerInterceptor], classOf[TxInterceptor[_, _]], classOf[OptimisticLockingInterceptor], classOf[NotificationInterceptor], classOf[L1LastChanceInterceptor], classOf[EntryWrappingInterceptor], classOf[L1TxInterceptor], classOf[TxDistributionInterceptor], classOf[CallInterceptor] ) ) } }
L1LastChanceInterceptorと、L1TxInterceptorが増えていますね。
Distributed Cacheでput/getを動かした時の、InterceptorからCommandが呼び出されるまでをトレースしてみる
最後に、Distributed Cacheのput/getをした時に、InterceptorのChainからCommandが呼び出されるまでの間をトレースしてみたいと思います。
トレースはBytemanで行いました。Visitor(Interceptor)の各種visit◯×メソッドのentry/exit時、ReplicableCommand#performのentry/exit時にコンソール出力するようにしています。
このルールは、自動生成しています。
https://github.com/kazuhira-r/infinispan-getting-started/blob/master/embedded-print-interceptors/visitor-trace-rule-generate.sh
で、テストコードを実行すると
it("distributed-cache, put/get") { withCache[String, String]("distributedCache") { cache => cache.put("key1", "value1") cache.get("key1") should be("value1") } }
こういう結果が得られます。
※インデントを入れて、少し見やすくしました
put。
[Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, entry [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, entry [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, entry [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, entry [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, entry [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, entry [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, entry [Interceptor] CallInterceptor:visitPutKeyValueCommand, entry [Command] PutKeyValueCommand:perform, entry [Command] PutKeyValueCommand:perform, exit [Interceptor] CallInterceptor:visitPutKeyValueCommand, exit [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, exit [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, exit [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, exit [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, exit [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, exit [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, exit [Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, exit
get。
[Interceptor] DistributionBulkInterceptor:visitGetKeyValueCommand, entry [Interceptor] InvocationContextInterceptor:visitGetKeyValueCommand, entry [Interceptor] CacheMgmtInterceptor:visitGetKeyValueCommand, entry [Interceptor] StateTransferInterceptor:visitGetKeyValueCommand, entry [Interceptor] NonTransactionalLockingInterceptor:visitGetKeyValueCommand, entry [Interceptor] EntryWrappingInterceptor:visitGetKeyValueCommand, entry [Interceptor] NonTxDistributionInterceptor:visitGetKeyValueCommand, entry [Interceptor] CallInterceptor:visitGetKeyValueCommand, entry [Command] GetKeyValueCommand:perform, entry [Command] GetKeyValueCommand:perform, exit [Interceptor] CallInterceptor:visitGetKeyValueCommand, exit [Interceptor] NonTxDistributionInterceptor:visitGetKeyValueCommand, exit [Interceptor] EntryWrappingInterceptor:visitGetKeyValueCommand, exit [Interceptor] NonTransactionalLockingInterceptor:visitGetKeyValueCommand, exit [Interceptor] StateTransferInterceptor:visitGetKeyValueCommand, exit [Interceptor] CacheMgmtInterceptor:visitGetKeyValueCommand, exit [Interceptor] InvocationContextInterceptor:visitGetKeyValueCommand, exit [Interceptor] DistributionBulkInterceptor:visitGetKeyValueCommand, exit
Interceptorのvisitメソッドが次々と呼ばれ、最後にPutKeyValueCommandやGetKeyValueCommandのperformメソッドが呼び出されていることがわかります。
では、PutKeyValueCommandやGetKeyValueCommandなどのクラスが必要に応じてネットワーク通信などをするのかというと、そうでもないようです。
このあたりは、もう少しInterceptorやCommandの中身に踏み込む時に見ていきたいと思います。
追記)
今回は、単一Nodeで動作させたので、すごくシンプルな結果になりました。ホントはRPCとか発生するはずなのですが…このあたりをトレースしてみた、別のエントリを書きました。
InfinispanのDistributed Cacheのput/getをトレースする(Local NodeがOnwerの場合) - CLOVER
InfinispanのDistributed Cacheのput/getをトレースする(Local NodeがOnwerではない場合) - CLOVER
ただ、このあたりの実装はInfinispan 9.0.0でけっこう変わりそうですけどね。まずは基礎的に追ってみようということで。
まとめ
今回は、Infinispanの内部で使われているInterceptorが、Cacheの種類や設定で、どのように変わっていくかの一例を見てみました。
設定が変わると、Interceptorが追加されたり変わったりして挙動が変わっていくような雰囲気が感じられますね。
そのうち、もうちょっとこの詳細を追ってみましょう。
今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-print-interceptors