CLOVER🍀

That was when it all began.

InfinispanのDistributed Entry Iteratorを試す

Infinispan 7.0で、Distributed Entry Iteratorというものが追加されたようです。

Infinispan 7.0 Release Notes
http://infinispan.org/infinispan-7.0/

ドキュメント上は、「Entry Retrieval」という節になっていますが。

2.2.2. Entry Retrieval
http://infinispan.org/docs/7.0.x/user_guide/user_guide.html#_entry_retrieval

どのようなものかというと、

  • 要はCacheに保存されたエントリ全体に対するIterator
  • KeyValueFilterインターフェースの実装を使用することで、Iteratorが取得するエントリのフィルタリングが可能
  • Converterを使用することで、エントリの変換が可能
  • removeオペレーションのサポート。ただし、トランザクション内であっても即時反映

といった代物。これにより、単一のNodeでクラスタ上のエントリに対してのイテレーションが実現できます、と。

Infinispan 7.0から追加された、インデックスを使わないQuery DSLも、リフレクションとこの仕組みを使って実現しているようです(だから、フルスキャン)。

なお、Iteratorがメモリに持つコンテンツのサイズは、state-transferでのchunk-sizeで決まる模様。

では、使ってみましょう。

依存関係の定義と設定ファイル

とりあえず、依存関係の定義から。
build.sbt

name := "entry-retrieval"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.5"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

fork in Test := true

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-core" % "7.0.3.Final",
  "net.jcip" % "jcip-annotations" % "1.0" % "provided",
  "org.scalatest" %% "scalatest" % "2.2.3" % "test"
)

Infinispanの設定ファイルは、このように。
src/test/resources/infinispan.xml

rc/test/resources/infinispan.xml 
<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:7.0 http://www.infinispan.org/schemas/infinispan-config-7.0.xsd"
    xmlns="urn:infinispan:config:7.0">
  <jgroups>
    <stack-file name="udp" path="jgroups.xml" />
  </jgroups>

  <cache-container name="cacheManager" shutdown-hook="REGISTER">
    <transport cluster="cluster" stack="udp" />
    <jmx duplicate-domains="true" />

    <distributed-cache name="iterationCache" />
  </cache-container>
</infinispan>

JGroupsの設定は端折ります。せっかくなので、Cacheの定義はDistributed Cacheとしました。

使ってみる

それでは、テストコードと合わせて使っていってみたいと思います。

テストコードのベースは、こんな感じで。
src/test/scala/org/littlewings/infinispan/entryretrieval/EntryRetrievalSpec.scala

package org.littlewings.infinispan.entryretrieval

import scala.collection.JavaConverters._

import java.util.concurrent.{ Callable, CountDownLatch, Executors }

import org.infinispan.Cache
import org.infinispan.filter.{ AcceptAllKeyValueFilter, CollectionKeyFilter, KeyFilterAsKeyValueFilter, KeyValueFilter }
import org.infinispan.manager.DefaultCacheManager

import org.scalatest.FunSpec
import org.scalatest.Matchers._

class EntryRetrievalSpec extends FunSpec {
  describe("Distributed Entry Iterator Spec") {
    // ここに、テストを書く!
  }

  def withCache[K, V](numInstances: Int, cacheName: String)(fun: Cache[K, V] => Unit): Unit = {
    val managers = (1 to numInstances) map (_ => new DefaultCacheManager("infinispan.xml"))
    managers.foreach(_.getCache[K, V](cacheName))

    try {
      val cache = managers.head.getCache[K, V](cacheName)
      fun(cache)
    } finally {
      managers.foreach(_.stop())
    }
  }
}

クラスタを構成できる簡易メソッドを定義して、この中でテストを書きつつ使ってみます。

全件取得

Infinispan的にはあまり推奨していないですが(全エントリを引っ張ってくるので)、まずはCacheに登録した全エントリを取得するイテレーションを行ってみます。

コードは、こんな感じ。

    it("iteration and sum") {
      withCache[String, Integer](3, "iterationCache") { cache =>
        (1 to 20).foreach(i => cache.put(s"key$i", i))

        val iterable =
          cache.getAdvancedCache.filterEntries(AcceptAllKeyValueFilter.getInstance)
        try {
          val sum =
            iterable.asScala.foldLeft(0) { (acc, cur) => acc + cur.getValue }
          sum should be (210)
        } finally {
          iterable.close()
        }
      }
    }

起点になるのはCacheから取得できるAdvancedCacheで、これに定義されたfilterEntriesというメソッドにKeyValueFilterのインスタンスを渡すことで、EntryIterableのインスタンスが取得できます。これは、Iterableの拡張です。

今回は、ここに全件取得を表すAcceptAllKeyValueFilterクラスのインスタンスを使うことで、全件取得を行っています。

        val iterable =
          cache.getAdvancedCache.filterEntries(AcceptAllKeyValueFilter.getInstance)

あとは通常のIterableとして使えばいいのですが、最後にcloseをしましょう。

        try {
          val sum =
            iterable.asScala.foldLeft(0) { (acc, cur) => acc + cur.getValue }
          sum should be (210)
        } finally {
          iterable.close()
        }

ちなみに、このコードはScalaのIterableに変換しています…。また、イテレーションの結果としてはsumを取りました。

イテレーション時に渡されるクラスですが、CacheEntryというMap.Entryの拡張が渡ってくるので、ここから必要に応じてキーなり値なりを取得して判定しましょう。

自分でKeyValueFilterを定義する

InfinispanにもKeyValueFilterの定義はありますが、自分で定義することも可能です。

KeyValueFilterインターフェースの実装を定義すればOKです。
src/main/scala/org/littlewings/infinispan/entryretrieval/EvenKeyValueFilter.scala

package org.littlewings.infinispan.entryretrieval

import org.infinispan.filter.KeyValueFilter
import org.infinispan.metadata.Metadata

@SerialVersionUID(1L)
class EvenKeyValueFilter extends KeyValueFilter[String, Integer] with Serializable {
  override def accept(key: String, value: Integer, metadata: Metadata): Boolean =
    key.replaceAll("key", "").toInt % 2 == 0
}

acceptメソッドを実装して、このメソッドがtrueを返すようにすれば、そのエントリはイテレーションでの取得対象となります。ここでは、決め打ちですがキーをStringでもらうとして、「keyXX」のXXの部分が偶数なら対象とするフィルタとしました。
※例がひどい…

また、KeyValueFilterやこの後名前だけ登場するConverterなどは、Serializableであること、またはInfinispanのExternalizerを用意する必要があります。

テストコード。

    it("even key value filter") {
      withCache[String, Integer](3, "iterationCache") { cache =>
        (1 to 20).foreach(i => cache.put(s"key$i", i))

        val iterable =
          cache.getAdvancedCache.filterEntries(new EvenKeyValueFilter)
        try {
          val sum =
            iterable.asScala.foldLeft(0) { (acc, cur) => acc + cur.getValue }
          sum should be (110)
        } finally {
          iterable.close()
        }
      }
    }

例は、今回もsum。

KeyFilterをKeyValueFilterとして使う

突如名前が登場しましたが、Infinispan 6でKeyFilterというものが追加されていたようですが、これをKeyValueFilterとして使うことができます。

ここでは、このうちキーのコレクションを扱うCollectionKeyFilterと、KeyFilterをKeyValueFilterとして使うKeyFilterAsKeyValueFilterを組み合わせて使ってみます。

    it("collection key filter") {
      withCache[String, Integer](3, "iterationCache") { cache =>
        (1 to 20).foreach(i => cache.put(s"key$i", i))

        val filter: KeyValueFilter[String, Integer] =
          new KeyFilterAsKeyValueFilter(new CollectionKeyFilter(List("key3", "key5").asJava, true))
        val iterable =
          cache.getAdvancedCache.filterEntries(filter)

        try {
          val values = iterable.asScala.map(_.getValue)
          values should contain allOf (3, 5)
        } finally {
          iterable.close()
        }
      }
    }

ここの意味ですが、キーのコレクションを対象にするCollectionKeyFilterに「key3」と「key5」を対象にするように指定してインスタンスを作成し、それをKeyValueFilterとして使うような定義となっています。

        val filter: KeyValueFilter[String, Integer] =
          new KeyFilterAsKeyValueFilter(new CollectionKeyFilter(List("key3", "key5").asJava, true))

CollectionKeyFilterのコンストラクタの第2引数ですが、指定しない場合はfalseで、この場合「イテレーションでの取得対象外」を定義することになるようです。今回は、取得対象として定義したかったのでtrueを明示的に指定しました。

結果、「key3」と「key5」に対応するエントリが取得できています。

          val values = iterable.asScala.map(_.getValue)
          values should contain allOf (3, 5)

KeyValueFilterConverterを使う

KeyValueFilterは、イテレーションでの取得対象を決定するだけですが、この拡張インターフェースであるKeyValueFilterConverterを使用すると、フィルタリングと変換を同時に行うことができます。
※単独でConverterを作成してもOKそうですが

このインターフェースの実装を作成するには、直接インターフェースを実装するのではなくAbstractKeyValueFilterConverterクラスを継承するのがよさそうです。

今回は、先ほどの「keyXX」の偶数を対象にすることに加え、値を2倍するKeyValueFilterConverterの実装を作成します。
src/main/scala/org/littlewings/infinispan/entryretrieval/EvenDoublingKeyValueFiterConverter.scala

package org.littlewings.infinispan.entryretrieval

import org.infinispan.filter.AbstractKeyValueFilterConverter
import org.infinispan.metadata.Metadata

@SerialVersionUID(1L)
class EvenDoublingKeyValueFilterConverter
    extends AbstractKeyValueFilterConverter[String, Integer, Integer]
    with Serializable {
  override def filterAndConvert(key: String, value: Integer, metadata: Metadata): Integer =
    if ((key.replaceAll("key", "").toInt % 2) == 0)
      value * 2
    else
      null
}

AbstractKeyValueFilterConverterクラスを継承する場合、filterAndConvertメソッドを実装すればOKです。

filterAndConvertメソッドがnullを返すとイテレーションの対象外となり、null以外を返すとその値がイテレーションでのエントリの値となります。

今回定義した、KeyValueFilterConverterを使用したテストコード。

    it("even doubling filter and converter") {
      withCache[String, Integer](3, "iterationCache") { cache =>
        (1 to 20).foreach(i => cache.put(s"key$i", i))

        val iterable =
          cache.getAdvancedCache.filterEntries(new EvenDoublingKeyValueFilterConverter)
        try {
          val values = iterable.asScala.foldLeft(0) { (acc, cur) => acc + cur.getValue }
          values should be (220)
        } finally {
          iterable.close()
        }
      }

ちなみに、単純なConverterというインターフェースも存在していて、EntryIterableからさらにconverterメソッドを呼び出すことで、KeyVaueFilterとは別にConveterを独立して設定することも可能なようです。今回は試していないですけど。
以下、ドキュメントのサンプル。

advancedCache.filterEntries(teslaCarFilter).converter(new CarWheelConverter(3)) {
   for (CacheEntry<String, Wheel> entry : iterable) {
      // Do something with the third wheel of the car
   }
}

あと、ドキュメント上はIteratorトランザクションとの関連についても書かれていて、現在のトランザクションに従うものの、すべてのエントリをIteratorに持たせることができないため、イテレーションで取得するエントリはトランザクションコンテキストに追加されないとのことです。

つまり、分離レベルをREPEATABLE_READにしても、READ_COMMITTEDのような動作に見えるだろうということ。

これ、普通のトランザクションのコードを書いててよくわからなくなったので、いつか時間があれば確認してみるかも…。

とりあえず、こんなところです。

今回書いたコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-entry-retrieval