CLOVER🍀

That was when it all began.

Infinispan 9.1で追加された、Scattered Cacheを試す

先日、Infinispan 9.1.0.Finalがリリースされました。

Infinispan: Infinispan 9.1 "Bastille"

新機能もいくつか増えていますが、今回は新しいCache、「Scattered Cache」を試してみたいと思います。

Scattered cache

A new clustered cache, similar to a distributed cache, but with a higher write throughput.

http://blog.infinispan.org/2017/07/infinispan-91-bastille.html

Scattered Cacheとは?

ドキュメントについては、こちら。

Scattered Mode

ざっと抜き出すと、こういう性格みたいです

  • クラスタリングすることで線形にスケールするCacheで、Distributed Cacheによく似ている
  • Distributed Cacheとは異なり、データの位置は固定されていない
  • Consistent Hashアルゴリズムを使用して、Primary Ownerを決定する
  • バックアップは、前回データを書き込んだNodeに格納される
  • バックアップの正確な位置は重要ではない(というか、わからない?)

バックアップについては、いまひとつ掴めないような…。

  • Distributed Cacheと異なり、書き込みが単一のRPCとなる利点がある
  • 読み取りは、常にPrimary Ownerを対象にする必要がある
  • 結果、書き込みは速くなり、読み込みは遅くなる可能性がある

つまり、書き込みが集中するアプリケーションに適したCacheですと。

  • 複数のバックアップコピーを保持した場合、メモリの消費量が増加する
  • 期限切れのバックアップコピーを削除するため、無効化するためのメッセージをクラスタ内にブロードキャストするが、オーバーヘッドがある
  • とても大きなクラスタだと、パフォーマンスが低下する

バックアップがあちこちにできそうな感じですね、それを無効化するメッセージをブロードキャストしてデータの
状態を保つ、と。

  • NodeがクラッシュしてPrimaryが失われた場合は、クラスタ内のバックアッププロセスを調整して、最後に書き込まれたバックアップコピーを探す
  • このため、ネットワークトラフィックが増加する
  • データライターもバックアップであるため、トランスポートレベルでmachine/rack/siteのIDを指定しても、同じmachine/rack/siteで複数の障害が発生した場合にクラスタを回復することができない
  • トランザクションと非同期レプリケーションはサポートされていない

といった感じです。

なんか、バックアップがあちこちにありそうなイメージの説明です。で、それを無効化する…と。

まあ、説明の読み解きはこのくらいにして、使ってみましょう。

準備

sbtでの依存関係の定義は、こちら。

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-core" % "9.1.0.Final" % Compile,
  "net.jcip" % "jcip-annotations" % "1.0" % Provided,
  "org.scalatest" %% "scalatest" % "3.0.3" % Test
)

ScalaTestは、テストコード用です。

Infinispanの設定ファイルは、こちら。
src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:9.1 http://www.infinispan.org/schemas/infinispan-config-9.1.xsd"
        xmlns="urn:infinispan:config:9.1">
    <jgroups>
        <stack-file name="udp" path="default-configs/default-jgroups-udp.xml"/>
    </jgroups>
    <cache-container>
        <jmx duplicate-domains="true"/>
        <transport cluster="test-cluster" stack="udp"/>

        <!-- あとで -->

    </cache-container>
</infinispan>

テストコードの雛形

テストコードを書くにあたり、こんな雛形を用意。
src/test/scala/org/littlewings/infinispan/scattered/ScatteredCacheSpec.scala

package org.littlewings.infinispan.scattered

import java.util.concurrent.TimeUnit

import org.infinispan.Cache
import org.infinispan.commons.util.Util
import org.infinispan.distribution.group.impl.PartitionerConsistentHash
import org.infinispan.manager.{DefaultCacheManager, EmbeddedCacheManager}
import org.infinispan.util.function.SerializableToIntFunction
import org.scalatest.{FunSuite, Matchers}

class ScatteredCacheSpec extends FunSuite with Matchers {
  // あとでテストを書く!

  def withCache[K, V](cacheName: String, numInstances: Int = 1)(fun: Cache[K, V] => Unit): Unit = {
    val managers = (1 to numInstances).map(_ => new DefaultCacheManager("infinispan.xml"))

    managers.foreach(_.getCache[K, V](cacheName))

    try {
      val cache = managers(0).getCache[K, V](cacheName)

      fun(cache)
    } finally {
      managers.foreach(_.stop())
    }
  }

  def withCacheWithManagers[K, V](cacheName: String, numInstances: Int = 1)(fun: (Cache[K, V], IndexedSeq[EmbeddedCacheManager]) => Unit): Unit = {
    val managers = (1 to numInstances).map(_ => new DefaultCacheManager("infinispan.xml"))

    managers.foreach(_.getCache[K, V](cacheName))

    try {
      val cache = managers(0).getCache[K, V](cacheName)

      fun(cache, managers)
    } finally {
      managers.foreach(_.stop())
    }
  }
}

簡単にクラスタを構成できるヘルパーメソッド付きですが、クラスタを構成する各EmbeddedCacheManagerを取得できるパターンも用意。

使ってみる

それでは、Scattered Cacheを使ってみます。

まずは、設定ファイルに最小構成で定義。

        <scattered-cache name="simpleScatteredCache"/>

Scattered Cacheの設定項目自体は、こちらを見るとよいでしょう。

urn:infinispan:config:9.1

とりあえず、Getting Started的な。

  test("simple use Scattered Cache") {
    withCache[String, String]("simpleScatteredCache", 3) { cache =>
      (1 to 10).foreach(i => cache.put(s"key$i", s"value$i"))

      (1 to 10).foreach(i => cache.get(s"key$i") should be(s"value$i"))

      cache should have size (10)
    }
  }

ふつーのCacheですね。

クラスタ内のNode数は、3としています。

    withCache[String, String]("simpleScatteredCache", 3) { cache =>

少し、設定まわりを見てみましょう。

  test("Scattered Cache, default configuration") {
    withCache[String, String]("simpleScatteredCache", 3) { cache =>
      (1 to 10).foreach(i => cache.put(s"key$i", s"value$i"))

      val dm = cache.getAdvancedCache.getDistributionManager
      dm.getReadConsistentHash.toString should startWith("PartitionerConsistentHash:ScatteredConsistentHash{ns=")
      dm.getWriteConsistentHash.toString should startWith("PartitionerConsistentHash:ScatteredConsistentHash{ns=")

      dm.getReadConsistentHash.getNumSegments should be(256)
      dm.getWriteConsistentHash.getNumSegments should be(256)

      cache.getCacheConfiguration.clustering.hash.numOwners should be(1)

      val cacheTopology = dm.getCacheTopology

      (1 to 10).foreach { i =>
        val distributionInfo = cacheTopology.getDistribution(s"key$i")
        distributionInfo.writeOwners should have size (1)
        distributionInfo.writeBackups should be(empty)
      }
    }
  }

ConsistentHashとしては、PartitionerConsistentHash/ScatteredConsistentHashという階層になっているようです。

Segment数は、256。

      dm.getReadConsistentHash.getNumSegments should be(256)
      dm.getWriteConsistentHash.getNumSegments should be(256)

データのオーナー数は、なんと1です。

      cache.getCacheConfiguration.clustering.hash.numOwners should be(1)

変更することもできません。これで、バックアップ大丈夫?とか思うのですが、これでも大丈夫です。

この状態なので、Owner数は1、バックアップ先は0となっていますけれど…。

      val cacheTopology = dm.getCacheTopology

      (1 to 10).foreach { i =>
        val distributionInfo = cacheTopology.getDistribution(s"key$i")
        distributionInfo.writeOwners should have size (1)
        distributionInfo.writeBackups should be(empty)
      }

なので、Nodeダウンの時の挙動が気になるわけですが

  test("Scattered Cache, node down") {
    withCacheWithManagers[String, String]("simpleScatteredCache", 3) { (cache, managers) =>
      (1 to 100).foreach(i => cache.put(s"key$i", s"value$i"))
      cache should have size (100)

      val anotherCache = managers(1).getCache[String, String]("simpleScatteredCache")
      anotherCache should have size (100)

      cache.getCacheManager.stop()
      TimeUnit.SECONDS.sleep(3L)
      anotherCache should have size (100)

      managers(2).stop()
      TimeUnit.SECONDS.sleep(3L)
      anotherCache should have size (100)

      anotherCache.stop()
    }
  }

急激に多重障害でも起こさなければ、Distributed Cacheと同じようにバックアップから復旧してくれるようです。

データの配置状況を確認してみましょう。

  test("Scattered Cache, data distribution") {
    withCache[String, String]("simpleScatteredCache", 3) { cache =>
      (1 to 10).foreach(i => cache.put(s"key$i", s"value$i"))

      val self = cache.getCacheManager.getAddress
      val dm = cache.getAdvancedCache.getDistributionManager
      val cacheTopology = dm.getCacheTopology

      println(s"self = $self")
      (1 to 10).foreach(i => println(cacheTopology.getDistribution(s"key$i").primary()))

      (1 to 10).foreach(i => cache.put(s"key$i", s"value2-$i"))

      println(s"self = $self")
      (1 to 10).foreach(i => println(cacheTopology.getDistribution(s"key$i").primary()))
    }
  }

1回、途中でputし直しています。

結果。なお、selfというのはLocal Nodeです。

## 1回目
self = xxxxx-61451
xxxxx-61451
xxxxx-61451
xxxxx-61451
xxxxx-16211
xxxxx-3970
xxxxx-61451
xxxxx-16211
xxxxx-61451
xxxxx-3970
xxxxx-3970


## 2回目
self = xxxxx-61451
xxxxx-61451
xxxxx-61451
xxxxx-61451
xxxxx-16211
xxxxx-3970
xxxxx-61451
xxxxx-16211
xxxxx-61451
xxxxx-3970
xxxxx-3970

やっぱり、バックアップ先が表示されませんけどね…。

Consistent Hashで、キーから配置先のSegmentを算出していることの確認。

  test("Scattered Cache, hash segment") {
    withCache[String, String]("simpleScatteredCache", 3) { cache =>
      (1 to 10).foreach(i => cache.put(s"key$i", s"value$i"))

      val dm = cache.getAdvancedCache.getDistributionManager
      val readConHash = dm.getReadConsistentHash.asInstanceOf[PartitionerConsistentHash]
      val writeConHash = dm.getWriteConsistentHash.asInstanceOf[PartitionerConsistentHash]

      val readSegmentSize = readConHash.getNumSegments
      val readHash = readConHash.getHashFunction
      val writeSegmentSize = writeConHash.getNumSegments
      val writeHash = writeConHash.getHashFunction

      (1 to 10).foreach { i =>
        val key = s"key$i"
        ((readHash.hash(key) & Integer.MAX_VALUE) / Util.getSegmentSize(readSegmentSize)) should be(readConHash.getSegment(key))
        ((writeHash.hash(key) & Integer.MAX_VALUE) / Util.getSegmentSize(writeSegmentSize)) should be(writeConHash.getSegment(key))
      }
    }
  }

Distributed Stream APIの使用も可能です。

  test("Scattered Cache, stream api") {
    withCache[String, String]("simpleScatteredCache", 3) { cache =>
      (1 to 10).foreach(i => cache.put(s"key$i", s"value$i"))

      val sum =
        cache
          .values
          .stream
          .mapToInt(new SerializableToIntFunction[String] {
            override def applyAsInt(value: String): Int =
              Integer.parseInt(value.replace("value", ""))
          })
          .sum

      sum should be(55)
    }
  }

データの配置をコントロールしてみましょう。

デフォルトだと、HashFunctionPartitionerが使用されます。

なので、データの配置状況を確認すると

  test("Scattered Cache, non key partitioner") {
    withCache[String, String]("simpleScatteredCache", 3) { cache =>
      (1 to 10).foreach { i =>
        val partition = (i % 3) + 1
        cache.put(s"key$partition-$i", s"value$partition-$i")
      }

      val dm = cache.getAdvancedCache.getDistributionManager
      val cacheTopology = dm.getCacheTopology

      val keys = cache.keySet

      keys.forEach { key =>
        println(s"key => $key")
        println(cacheTopology.getWriteOwners(key))
      }
    }
  }

まあ、バラバラになります。
※グルーピングのために、「keyN-M」としています

key => key3-5
[xxxxx-41610]
key => key3-8
[xxxxx-41610]
key => key3-2
[xxxxx-41610]
key => key1-9
[xxxxx-41610]
key => key2-7
[xxxxx-41610]
key => key2-4
[xxxxx-25138]
key => key2-10
[xxxxx-36570]
key => key1-3
[xxxxx-36570]
key => key1-6
[xxxxx-36570]
key => key2-1
[xxxxx-36570]

ここで、KeyPartitionerを作成してみます。内容は適当です。
src/test/scala/org/littlewings/infinispan/scattered/SimpleKeyPartitioner.scala

package org.littlewings.infinispan.scattered

import org.infinispan.configuration.cache.HashConfiguration
import org.infinispan.distribution.ch.KeyPartitioner

class SimpleKeyPartitioner extends KeyPartitioner {
  var configuration: HashConfiguration = _

  override def init(configuration: HashConfiguration): Unit = {
    this.configuration = configuration
  }

  override def getSegment(key: Any): Int =
    (Integer.parseInt(key.asInstanceOf[String].replaceAll("""key(\d+)-\d+""", "$1")) * 150 + 73) % configuration.numSegments
}

設定ファイル上では、このように設定。

        <scattered-cache name="keyPartitionedScatteredCache"
                         key-partitioner="org.littlewings.infinispan.scattered.SimpleKeyPartitioner"/>

あとは、テストを実行。

  test("Scattered Cache, key partitioner") {
    withCache[String, String]("keyPartitionedScatteredCache", 3) { cache =>
      (1 to 10).foreach { i =>
        val partition = (i % 3) + 1
        cache.put(s"key$partition-$i", s"value$partition-$i")
      }

      val dm = cache.getAdvancedCache.getDistributionManager
      val cacheTopology = dm.getCacheTopology

      val keys = cache.keySet

      keys.forEach { key =>
        println(s"key => $key")
        println(cacheTopology.getWriteOwners(key))
      }
    }
  }

とりあえず(?)、配置先が「keyN」ごとにグルーピングされましたね。

key => key2-10
[xxxxx-22564]
key => key1-9
[xxxxx-22564]
key => key1-3
[xxxxx-22564]
key => key1-6
[xxxxx-22564]
key => key2-7
[xxxxx-22564]
key => key2-4
[xxxxx-22564]
key => key2-1
[xxxxx-22564]
key => key3-5
[xxxxx-29791]
key => key3-8
[xxxxx-29791]
key => key3-2
[xxxxx-29791]

サンプルとしては、こんなところで。

少し中身を

と、ここまで試してみたところで、軽く中身の方を見ておきたいと思います。

説明が特徴的なRead/Writeのところですが、確かにPrimary Ownerにアクセスしにいくコードが目立ちます。

Rread - ScatteredDistributionInterceptor#handleReadCommand
https://github.com/infinispan/infinispan/blob/9.1.0.Final/core/src/main/java/org/infinispan/interceptors/distribution/ScatteredDistributionInterceptor.java#L595

Write - ScatteredDistributionInterceptor#handleWriteCommand
https://github.com/infinispan/infinispan/blob/9.1.0.Final/core/src/main/java/org/infinispan/interceptors/distribution/ScatteredDistributionInterceptor.java#L149

バックアップ先は、次の(?)Memberらしいですよ。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/core/src/main/java/org/infinispan/interceptors/distribution/ScatteredDistributionInterceptor.java#L569

バックアップとかの実装は、確かにある意味固定的(?)な感じが。

あと、今回の説明では記載しませんでしたが、Scattered Cacheの「invalidation-batch-size」という設定は、ScatteredVersionManagerImplクラスで
使用されます。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/core/src/main/java/org/infinispan/scattered/impl/ScatteredVersionManagerImpl.java

登録・削除したキーを保持しておいて、その数が「invalidation-batch-size」を超えるとクリーンアップが始まります。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/core/src/main/java/org/infinispan/scattered/impl/ScatteredVersionManagerImpl.java#L210
https://github.com/infinispan/infinispan/blob/9.1.0.Final/core/src/main/java/org/infinispan/scattered/impl/ScatteredVersionManagerImpl.java#L465

登録の契機は、ScatteredDistributionInterceptorだったりします。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/core/src/main/java/org/infinispan/interceptors/distribution/ScatteredDistributionInterceptor.java

…とりあえず、このくらいで。

まとめ

Infinispan 9.1.0.Finalで追加された、Scattered Cacheを試してみました。内部的な挙動はちゃんと把握しきれていませんが、使い方と実装コードの
なんとなくの雰囲気はつかめたのでいいかなと思います。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-scattered-cache