CLOVER🍀

That was when it all began.

InfinispanのInterceptorの組み合わせを、Cacheの種類ごとに見てみる

Cacheに対するputやgetなどの各種操作を実行した時の裏舞台を少し知りたいと思い、その内部を少し追ってみることにしました。

Cacheに対するputやgetは、Commandというものに委譲して行われるようです。
CacheImpl#putより。
https://github.com/infinispan/infinispan/blob/8.2.2.Final/core/src/main/java/org/infinispan/cache/impl/CacheImpl.java#L1106-L1122

   @SuppressWarnings("unchecked")
   final V put(K key, V value, Metadata metadata,
         EnumSet<Flag> explicitFlags, ClassLoader explicitClassLoader) {
      assertKeyValueNotNull(key, value);
      InvocationContext ctx = getInvocationContextWithImplicitTransaction(false, explicitClassLoader, 1);
      return putInternal(key, value, metadata, explicitFlags, ctx);
   }

   @SuppressWarnings("unchecked")
   private V putInternal(K key, V value, Metadata metadata,
         EnumSet<Flag> explicitFlags, InvocationContext ctx) {
      Set<Flag> flags = addUnsafeFlags(explicitFlags);
      Metadata merged = applyDefaultMetadata(metadata);
      PutKeyValueCommand command = commandsFactory.buildPutKeyValueCommand(key, value, merged, flags);
      ctx.setLockOwner(command.getKeyLockOwner());
      return (V) executeCommandAndCommitIfNeeded(ctx, command);
   }

   // 少し離れて

   private Object executeCommandAndCommitIfNeeded(InvocationContext ctx, VisitableCommand command) {
      final boolean txInjected = isTxInjected(ctx);
      Object result;
      try {
         result = invoker.invoke(ctx, command);
      } catch (RuntimeException e) {
         if (txInjected) tryRollback();
         throw e;
      }

      if (txInjected) {
         tryCommit();
      }

      return result;
   }

CacheImpl#getより。
https://github.com/infinispan/infinispan/blob/8.2.2.Final/core/src/main/java/org/infinispan/cache/impl/CacheImpl.java#L406-L412

   @SuppressWarnings("unchecked")
   final V get(Object key, EnumSet<Flag> explicitFlags, ClassLoader explicitClassLoader) {
      assertKeyNotNull(key);
      InvocationContext ctx = getInvocationContextForRead(explicitClassLoader, 1);
      GetKeyValueCommand command = commandsFactory.buildGetKeyValueCommand(key, explicitFlags);
      return (V) invoker.invoke(ctx, command);
   }

ここで、invokerというのはInfinispanが内部で持つInterceptorが登録されたものです。

   protected InterceptorChain invoker;

Cacheの構成状態でCommandが呼び出されるまでのInterceptorの組み合わせが変わるようなのですが、今回はここを追ってみたいと思います。

準備

まずは、ビルド定義から。
build.sbt

name := "embedded-print-interceptors"

version := "0.0.1-SNAPSHOT"

organization := "org.littlewings"

scalaVersion := "2.11.8"

updateOptions := updateOptions.value.withCachedResolution(true)

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

javaOptions in Test += "-javaagent:" + System.getenv("BYTEMAN_HOME") + "/lib/byteman.jar=script:trace.btm"

fork in Test := true

parallelExecution in Test := false

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-core" % "8.2.2.Final",
  "net.jcip" % "jcip-annotations" % "1.0" % "provided",
  "org.scalatest" %% "scalatest" % "2.2.6" % "test"
)

なぜかBytemanの設定が入っていますが、そのあたりは後ほど。

Infinispanの設定ファイルは、以下のもので用意しました。
src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:8.2 http://www.infinispan.org/schemas/infinispan-config-8.2.xsd"
        xmlns="urn:infinispan:config:8.2">
    <jgroups>
        <stack-file name="udp" path="jgroups.xml"/>
    </jgroups>

    <cache-container default-cache="localCache">
        <jmx duplicate-domains="true"/>
        <transport cluster="cluster" stack="udp"/>

        <local-cache name="localCache"/>
        <replicated-cache name="replicatedCache"/>
        <distributed-cache name="distributedCache"/>
        <invalidation-cache name="invalidationCache"/>

        <local-cache name="localTxCache">
            <transaction mode="NON_XA"/>
        </local-cache>
        <replicated-cache name="replicatedTxCache">
            <transaction mode="NON_XA"/>
        </replicated-cache>
        <distributed-cache name="distributedTxCache">
            <transaction mode="NON_XA"/>
        </distributed-cache>
        <invalidation-cache name="invalidationTxCache">
            <transaction mode="NON_XA"/>
        </invalidation-cache>

        <distributed-cache name="distributedL1Cache" l1-lifespan="1000"/>
        <distributed-cache name="distributedL1TxCache" l1-lifespan="1000">
            <transaction mode="NON_XA"/>
        </distributed-cache>
    </cache-container>
</infinispan>

JGroupsの設定は、端折ります。

Local Cache、Replicated Cache、Distributed Cache、Invalidation Cache、それぞれのトランザクションに対応したCacheを用意しました。

追記
ただ、今回はクラスタを構成せずにやったので、あくまでLocal Nodeでの挙動しか見れない感じになっています…。あとで気付きました。

また、Distributed CacheのみL1 Cacheを有効にしたパターンも用意しています。

これらを使って、Cacheの種類や設定からInterceptorがどのように変わっていくか確認してみましょう。

InterceptorとCommand

Custom Interceptorを扱った時にも少し見たのですが、

InfinispanのCustom Interceptorを実装する - CLOVER

InterceptorはVisitorパターンで構成されており、visit◯×みたいなメソッドを多く持っています。

https://github.com/infinispan/infinispan/blob/8.2.2.Final/core/src/main/java/org/infinispan/commands/Visitor.java

種類でいくと、ざっとこれだけです(8.2.2.Final)。

  • visitPutKeyValueCommand
  • visitRemoveCommand
  • visitReplaceCommand
  • visitClearCommand
  • visitPutMapCommand
  • visitEvictCommand
  • visitApplyDeltaCommand
  • visitSizeCommand
  • visitGetKeyValueCommand
  • visitGetCacheEntryCommand
  • visitGetAllCommand
  • visitKeySetCommand
  • visitEntrySetCommand
  • visitPrepareCommand
  • visitRollbackCommand
  • visitCommitCommand
  • visitInvalidateCommand
  • visitInvalidateL1Command
  • visitLockControlCommand
  • visitUnknownCommand
  • visitDistributedExecuteCommand
  • visitGetKeysInGroupCommand
  • visitReadOnlyKeyCommand
  • visitReadOnlyManyCommand
  • visitWriteOnlyKeyCommand
  • visitReadWriteKeyValueCommand
  • visitReadWriteKeyCommand
  • visitWriteOnlyManyEntriesCommand
  • visitWriteOnlyKeyValueCommand
  • visitWriteOnlyManyCommand
  • visitReadWriteManyCommand
  • visitReadWriteManyEntriesCommand

そして、チェーンされたInterceptorの最後にCommand(ReplicableCommand)が呼び出されるようになっています。

https://github.com/infinispan/infinispan/blob/8.2.2.Final/core/src/main/java/org/infinispan/commands/ReplicableCommand.java

で、今回はCacheの種類や構成でどのようなInterceptorが組み上げられていくかを見てみるわけです。

どのようなInterceptorが組み込まれるかは、InterceptorChainFactory#buildInterceptorChainで決められるようです。

https://github.com/infinispan/infinispan/blob/8.2.2.Final/core/src/main/java/org/infinispan/factories/InterceptorChainFactory.java#L105

それでは、テストコードを書きつつ確認してみましょう。

テストコードの雛形

今回作成したテストコードの雛形は、こちら。
src/test/scala/org/littlewings/infinispan/interceptors/PrintInterceptorsSpec.scala

package org.littlewings.infinispan.interceptors

import org.infinispan.Cache
import org.infinispan.interceptors.distribution._
import org.infinispan.interceptors.locking.{NonTransactionalLockingInterceptor, OptimisticLockingInterceptor}
import org.infinispan.interceptors._
import org.infinispan.manager.DefaultCacheManager
import org.infinispan.statetransfer.{StateTransferInterceptor, TransactionSynchronizerInterceptor}
import org.scalatest.{FunSpec, Matchers}

import scala.collection.JavaConverters._

class PrintInterceptorsSpec extends FunSpec with Matchers {
  describe("Print Interceptors Spec") {
    // ここに、テストを書く!
  }

  protected def withCache[K, V](cacheName: String, numInstances: Int = 1)(fun: Cache[K, V] => Unit): Unit = {
    val managers = (1 to numInstances).map(_ => new DefaultCacheManager("infinispan.xml"))
    val cache = managers(0).getCache[K, V](cacheName)

    try {
      fun(cache)
    } finally {
      managers.foreach(_.getCache[K, V](cacheName).stop())
      managers.foreach(_.stop())
    }
  }
}

Cacheを利用する簡易メソッド付き。今回は、クラスタは組みませんでしたが…。

Local Cache

それでは、Local Cacheから見てみます。どのようなInterceptorが利用されているかを確認するには、AdvancedCacheのgetInterceptorChainメソッドを利用すればOKです。

というわけで、ほぼデフォルトのLocal Cacheで使われているInterceptorを見ると、こんな感じになっていました。

    it("local-cache") {
      withCache[String, String]("localCache") { cache =>
        cache.getAdvancedCache.getInterceptorChain.asScala.map(_.getClass) should contain theSameElementsInOrderAs (
          Seq(
            classOf[InvocationContextInterceptor],
            classOf[CacheMgmtInterceptor],
            classOf[NonTransactionalLockingInterceptor],
            classOf[EntryWrappingInterceptor],
            classOf[CallInterceptor]
          )
          )
      }
    }

InvocationContextInterceptorを先頭に、最後にCallInterceptorが来るといった構成になっています。

他のCacheもどんどん見ていきましょう。

Replicated Cache/Distributed Cache

Replicated CacheとDistributed Cacheは、構成するInterceptorが同じになるようです。

    it("replicated-cache") {
      withCache[String, String]("replicatedCache") { cache =>
        cache.getAdvancedCache.getInterceptorChain.asScala.map(_.getClass) should contain theSameElementsInOrderAs (
          Seq(
            classOf[DistributionBulkInterceptor[_, _]],
            classOf[InvocationContextInterceptor],
            classOf[CacheMgmtInterceptor],
            classOf[StateTransferInterceptor],
            classOf[NonTransactionalLockingInterceptor],
            classOf[EntryWrappingInterceptor],
            classOf[NonTxDistributionInterceptor],
            classOf[CallInterceptor]
          )
          )
      }
    }

    it("distributed-cache") {
      withCache[String, String]("distributedCache") { cache =>
        cache.getAdvancedCache.getInterceptorChain.asScala.map(_.getClass) should contain theSameElementsInOrderAs (
          Seq(
            classOf[DistributionBulkInterceptor[_, _]],
            classOf[InvocationContextInterceptor],
            classOf[CacheMgmtInterceptor],
            classOf[StateTransferInterceptor],
            classOf[NonTransactionalLockingInterceptor],
            classOf[EntryWrappingInterceptor],
            classOf[NonTxDistributionInterceptor],
            classOf[CallInterceptor]
          )
          )
      }
    }

Local Cacheの時と比べるとDistributionBulkInterceptorが先頭に来て、StateTransferInterceptor、NonTxDistributionInterceptorが追加されます。

Invalidation Cache

Invalidation Cacheの場合はLocal Cacheにとても近いですが、InvalidationInterceptorが追加されます。

    it("invalidation-cache") {
      withCache[String, String]("invalidationCache") { cache =>
        cache.getAdvancedCache.getInterceptorChain.asScala.map(_.getClass) should contain theSameElementsInOrderAs (
          Seq(
            classOf[InvocationContextInterceptor],
            classOf[CacheMgmtInterceptor],
            classOf[NonTransactionalLockingInterceptor],
            classOf[EntryWrappingInterceptor],
            classOf[InvalidationInterceptor],
            classOf[CallInterceptor]
          )
          )
      }
    }

トランザクションを有効にしてみる

次に、各Cacheでトランザクションを有効にしてみましょう。デフォルト状態の各種Cacheに、NON_XAな設定を追加してみます。

結果は、このように。

Local Cache

TxInterceptor、OptimisticLockingInterceptor、NotificationInterceptorが増えています。

    it("transactional local-cache") {
      withCache[String, String]("localTxCache") { cache =>
        cache.getAdvancedCache.getInterceptorChain.asScala.map(_.getClass) should contain theSameElementsInOrderAs (
          Seq(
            classOf[InvocationContextInterceptor],
            classOf[CacheMgmtInterceptor],
            classOf[TxInterceptor[_, _]],
            classOf[OptimisticLockingInterceptor],
            classOf[NotificationInterceptor],
            classOf[EntryWrappingInterceptor],
            classOf[CallInterceptor]
          )
          )
      }
    }

Replicated Cache/Distributed Cache

Replicated Cache/Distributed Cacheの場合は、TransactionSynchronizerInterceptor、TxInterceptor、NotificationInterceptorが追加され、一部のInterceptorがOptimisticLockingInterceptor、TxDistributionInterceptorに変更されます。

    it("transactional replicated-cache") {
      withCache[String, String]("replicatedTxCache") { cache =>
        cache.getAdvancedCache.getInterceptorChain.asScala.map(_.getClass) should contain theSameElementsInOrderAs (
          Seq(
            classOf[DistributionBulkInterceptor[_, _]],
            classOf[InvocationContextInterceptor],
            classOf[CacheMgmtInterceptor],
            classOf[StateTransferInterceptor],
            classOf[TransactionSynchronizerInterceptor],
            classOf[TxInterceptor[_, _]],
            classOf[OptimisticLockingInterceptor],
            classOf[NotificationInterceptor],
            classOf[EntryWrappingInterceptor],
            classOf[TxDistributionInterceptor],
            classOf[CallInterceptor]
          )
          )
      }
    }

    it("transactional distributed-cache") {
      withCache[String, String]("distributedTxCache") { cache =>
        cache.getAdvancedCache.getInterceptorChain.asScala.map(_.getClass) should contain theSameElementsInOrderAs (
          Seq(
            classOf[DistributionBulkInterceptor[_, _]],
            classOf[InvocationContextInterceptor],
            classOf[CacheMgmtInterceptor],
            classOf[StateTransferInterceptor],
            classOf[TransactionSynchronizerInterceptor],
            classOf[TxInterceptor[_, _]],
            classOf[OptimisticLockingInterceptor],
            classOf[NotificationInterceptor],
            classOf[EntryWrappingInterceptor],
            classOf[TxDistributionInterceptor],
            classOf[CallInterceptor]
          )
          )
      }
    }
Invalidation Cache

Invalidation Cacheの場合は、TxInterceptor、NotificationInterceptorが追加され、一部のInterceptorがOptimisticLockingInterceptorになります。

    it("transactional invalidation-cache") {
      withCache[String, String]("invalidationTxCache") { cache =>
        cache.getAdvancedCache.getInterceptorChain.asScala.map(_.getClass) should contain theSameElementsInOrderAs (
          Seq(
            classOf[InvocationContextInterceptor],
            classOf[CacheMgmtInterceptor],
            classOf[TxInterceptor[_, _]],
            classOf[OptimisticLockingInterceptor],
            classOf[NotificationInterceptor],
            classOf[EntryWrappingInterceptor],
            classOf[InvalidationInterceptor],
            classOf[CallInterceptor]
          )
          )
      }
    }

とりあえず今回は単純にNON_XAにしただけですが、ロック戦略を変えた場合はOptimisticLockingInterceptorからPessimisticLockingInterceptorになるでしょうし、その他の設定でもいろいろ変わるでしょう。

Distributed Cache+L1 Cache

あと、構成のバリエーションとして、Distributed CacheにL1 Cacheを付けてみましょう。Non TransactionalなCache、TransactionalなCacheそれぞれで試してみました。

    it("l1 enabled, distributed-cache") {
      withCache[String, String]("distributedL1Cache") { cache =>
        cache.getAdvancedCache.getInterceptorChain.asScala.map(_.getClass) should contain theSameElementsInOrderAs (
          Seq(
            classOf[DistributionBulkInterceptor[_, _]],
            classOf[InvocationContextInterceptor],
            classOf[CacheMgmtInterceptor],
            classOf[StateTransferInterceptor],
            classOf[NonTransactionalLockingInterceptor],
            classOf[L1LastChanceInterceptor],
            classOf[EntryWrappingInterceptor],
            classOf[L1NonTxInterceptor],
            classOf[NonTxDistributionInterceptor],
            classOf[CallInterceptor]
          )
          )
      }
    }

    it("l1 enabled, transactional distributed-cache") {
      withCache[String, String]("distributedL1TxCache") { cache =>
        cache.getAdvancedCache.getInterceptorChain.asScala.map(_.getClass) should contain theSameElementsInOrderAs (
          Seq(
            classOf[DistributionBulkInterceptor[_, _]],
            classOf[InvocationContextInterceptor],
            classOf[CacheMgmtInterceptor],
            classOf[StateTransferInterceptor],
            classOf[TransactionSynchronizerInterceptor],
            classOf[TxInterceptor[_, _]],
            classOf[OptimisticLockingInterceptor],
            classOf[NotificationInterceptor],
            classOf[L1LastChanceInterceptor],
            classOf[EntryWrappingInterceptor],
            classOf[L1TxInterceptor],
            classOf[TxDistributionInterceptor],
            classOf[CallInterceptor]
          )
          )
      }
    }

L1LastChanceInterceptorと、L1TxInterceptorが増えていますね。

Distributed Cacheでput/getを動かした時の、InterceptorからCommandが呼び出されるまでをトレースしてみる

最後に、Distributed Cacheのput/getをした時に、InterceptorのChainからCommandが呼び出されるまでの間をトレースしてみたいと思います。

トレースはBytemanで行いました。Visitor(Interceptor)の各種visit◯×メソッドのentry/exit時、ReplicableCommand#performのentry/exit時にコンソール出力するようにしています。

ルールはこちら。
https://github.com/kazuhira-r/infinispan-getting-started/blob/master/embedded-print-interceptors/trace.btm

このルールは、自動生成しています。
https://github.com/kazuhira-r/infinispan-getting-started/blob/master/embedded-print-interceptors/visitor-trace-rule-generate.sh

で、テストコードを実行すると

    it("distributed-cache, put/get") {
      withCache[String, String]("distributedCache") { cache =>
        cache.put("key1", "value1")
        cache.get("key1") should be("value1")
      }
    }

こういう結果が得られます。
※インデントを入れて、少し見やすくしました

put。

[Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, entry
  [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, entry
    [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, entry
      [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, entry
        [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, entry
          [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, entry
            [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, entry
              [Interceptor] CallInterceptor:visitPutKeyValueCommand, entry
                [Command] PutKeyValueCommand:perform, entry
                [Command] PutKeyValueCommand:perform, exit
              [Interceptor] CallInterceptor:visitPutKeyValueCommand, exit
            [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, exit
          [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, exit
        [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, exit
      [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, exit
    [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, exit
  [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, exit
[Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, exit

get。

[Interceptor] DistributionBulkInterceptor:visitGetKeyValueCommand, entry
  [Interceptor] InvocationContextInterceptor:visitGetKeyValueCommand, entry
    [Interceptor] CacheMgmtInterceptor:visitGetKeyValueCommand, entry
      [Interceptor] StateTransferInterceptor:visitGetKeyValueCommand, entry
        [Interceptor] NonTransactionalLockingInterceptor:visitGetKeyValueCommand, entry
          [Interceptor] EntryWrappingInterceptor:visitGetKeyValueCommand, entry
            [Interceptor] NonTxDistributionInterceptor:visitGetKeyValueCommand, entry
              [Interceptor] CallInterceptor:visitGetKeyValueCommand, entry
                [Command] GetKeyValueCommand:perform, entry
                [Command] GetKeyValueCommand:perform, exit
              [Interceptor] CallInterceptor:visitGetKeyValueCommand, exit
            [Interceptor] NonTxDistributionInterceptor:visitGetKeyValueCommand, exit
          [Interceptor] EntryWrappingInterceptor:visitGetKeyValueCommand, exit
        [Interceptor] NonTransactionalLockingInterceptor:visitGetKeyValueCommand, exit
      [Interceptor] StateTransferInterceptor:visitGetKeyValueCommand, exit
    [Interceptor] CacheMgmtInterceptor:visitGetKeyValueCommand, exit
  [Interceptor] InvocationContextInterceptor:visitGetKeyValueCommand, exit
[Interceptor] DistributionBulkInterceptor:visitGetKeyValueCommand, exit

Interceptorのvisitメソッドが次々と呼ばれ、最後にPutKeyValueCommandやGetKeyValueCommandのperformメソッドが呼び出されていることがわかります。

では、PutKeyValueCommandやGetKeyValueCommandなどのクラスが必要に応じてネットワーク通信などをするのかというと、そうでもないようです。

このあたりは、もう少しInterceptorやCommandの中身に踏み込む時に見ていきたいと思います。

追記
今回は、単一Nodeで動作させたので、すごくシンプルな結果になりました。ホントはRPCとか発生するはずなのですが…このあたりをトレースしてみた、別のエントリを書きました。

InfinispanのDistributed Cacheのput/getをトレースする(Local NodeがOnwerの場合) - CLOVER
InfinispanのDistributed Cacheのput/getをトレースする(Local NodeがOnwerではない場合) - CLOVER

ただ、このあたりの実装はInfinispan 9.0.0でけっこう変わりそうですけどね。まずは基礎的に追ってみようということで。

まとめ

今回は、Infinispanの内部で使われているInterceptorが、Cacheの種類や設定で、どのように変わっていくかの一例を見てみました。

設定が変わると、Interceptorが追加されたり変わったりして挙動が変わっていくような雰囲気が感じられますね。

そのうち、もうちょっとこの詳細を追ってみましょう。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-print-interceptors