CLOVER🍀

That was when it all began.

InfinispanのKeyPartitionerを使って、エントリの配置先Segmentをコントロールする

InfinispanのDistributed Streams APIのドキュメントを読んでいて、Segment based filteringという機能が増えていたことに気付きました。

Segment based filtering

Javadocを見ると、Infinispan 8.2からの機能みたいですね、気付いていませんでした。

KeyPartitioner (Infinispan JavaDoc 8.2.11.Final API)

どういう機能かというと…。

まず、使える対象はReplicated CacheとDistributed Cacheであることが前提となります。この機能を使うことで、特定のデータのサブセット上での操作を可能にします。これはKeyPartitionerにより決定されるもので、CacheStreamのfilterKeySegmentsメソッドを利用することで、Segmentをフィルタリングすることができるようになります。適用タイミングはキーによるフィルタの後ですが、中間操作が実行される前になります。

要するに、KeyPartitionerを使用してエントリの配置先Segmentをコントロールできるようになるということと、Distributed Streams APIを使う時にSegmentでフィルタリングをかけて対象のSegmentとデータをコントロールできる、ということみたいです。

近いような話としてGrouping APIがありますが、こちらは配置先のNodeのコントロールになります。
※とはいえ、実装を見ているとGrouping APIもKeyPartitionerになったような?

The Grouping API

KeyPartitionerの場合は、もうひとつ小さい単位、Segmentでのコントロールです。

Segmentというのはこちらを見るとよいですが、クラスタ内を分割した領域で、この中にエントリが配置されます。

Hashing Algorithms

複数のSegmentをNodeが保持するわけですが、各NodeがSegmentのOwnerという扱いになります。また、最初にSegmentが割り当てられたNodeがPrimaryOwnerと呼ばれます。その他のNodeは、バックアップというわけですね。

クラスタ内でのNodeに増減があった際にデータが移動しますが、その単位はこのSegment単位となります。デフォルトのSegment数は256です。

各Nodeへの配置方法を決めるのが、ConsistentHashFactoryとなります。

少し話がそれましたが、KeyPartitionerに話を戻します。ドキュメントによると、この機能は高度な機能であり、InfinispanのSegmentとハッシュに関する深い知識が必要だということです。Segmentベースのフィルタリングは、Segmentごとにデータを扱って処理を行いたい場合に役立つと考えられているようで、Apache Sparkなどと統合に便利なのだとか。

というわけで、使いこなすのは難しそうですが、とりあえず挙動をみてみましょう。

準備

ビルド定義から。
build.sbt

name := "embedded-key-partitioner"

version := "0.0.1-SNAPSHOT"

organization := "org.littlewings"

scalaVersion := "2.11.8"

updateOptions := updateOptions.value.withCachedResolution(true)

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature", "-Xexperimental")

parallelExecution in Test := false

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-core" % "8.2.1.Final",
  "net.jcip" % "jcip-annotations" % "1.0" % "provided",
  "org.scalatest" %% "scalatest" % "2.2.6" % "test"
)

動作確認は、テストコードで行います。

Infinispanの設定は大枠こんな感じですが、「あとで」と書いているところにはDistributed Cacheを定義します。そちらは後ほど。
src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:8.2 http://www.infinispan.org/schemas/infinispan-config-8.2.xsd"
        xmlns="urn:infinispan:config:8.2">
    <jgroups>
        <stack-file name="udp" path="jgroups.xml"/>
    </jgroups>

    <cache-container default-cache="localCache">
        <jmx duplicate-domains="true"/>
        <transport cluster="cluster" stack="udp"/>

        <local-cache name="localCache"/>

        <!-- あとで -->
    </cache-container>
</infinispan>

JGroupsの設定は、端折ります。

テストコードの雛形

先に、動作確認のためのテストコードの雛形を定義します。
src/test/scala/org/littlewings/keypartitioner/KeyPartitionerSpec.scala

package org.littlewings.keypartitioner

import java.util.stream.Collectors

import org.infinispan.{Cache, CacheStream}
import org.infinispan.distribution.ch.impl.HashFunctionPartitioner
import org.infinispan.manager.DefaultCacheManager
import org.infinispan.stream.CacheCollectors
import org.scalatest.{FunSpec, Matchers}

import scala.collection.JavaConverters._

class KeyPartitionerSpec extends FunSpec with Matchers {
  describe("Infinispan Key Partitioner") {
    // ここに、テストを書く!
  }

  protected def withCache[K, V](cacheName: String, numInstances: Int = 1)(fun: Cache[K, V] => Unit): Unit = {
    val managers = (1 to numInstances).map(_ => new DefaultCacheManager("infinispan.xml"))
    managers.foreach(_.getCache[K, V](cacheName))

    try {
      val cache = managers(0).getCache[K, V](cacheName)
      fun(cache)
    } finally {
      managers.foreach(_.getCache[K, V](cacheName).stop())
      managers.foreach(_.stop())
    }
  }
}

簡易的にクラスタを構成する、ヘルパーメソッド付き。

デフォルト設定の確認

自分でKeyPartitionerを作成する前に、デフォルトの実装を確認してみます。

Infinispanの設定ファイルには、デフォルト状態のDistributed Cacheを定義します。

        <distributed-cache name="defaultDistributedCache"/>

KeyPartitionerは、定義しないと利用されない、ではなくデフォルト実装があります。HashFunctionPartitionerというクラスらしいです。

    it("default KeyPartitioner is HashFunctionPartitioner.") {
      withCache[String, String]("defaultDistributedCache", 3) { cache =>
        cache.getCacheConfiguration.clustering.hash.keyPartitioner should be(a[HashFunctionPartitioner])
      }
    }

https://github.com/infinispan/infinispan/blob/8.2.1.Final/core/src/main/java/org/infinispan/configuration/cache/HashConfiguration.java#L28-L30

このKeyPartitionerの実装は、Segment数をもとに算出した値で、配置Segmentを決定します。単にSegment数というだけではなくて、いろいろ計算が入っていますが。

https://github.com/infinispan/infinispan/blob/8.2.1.Final/core/src/main/java/org/infinispan/distribution/ch/impl/HashFunctionPartitioner.java#L45
https://github.com/infinispan/infinispan/blob/8.2.1.Final/core/src/main/java/org/infinispan/distribution/ch/impl/HashFunctionPartitioner.java#L51
https://github.com/infinispan/infinispan/blob/8.2.1.Final/commons/src/main/java/org/infinispan/commons/util/Util.java#L751-L753

追記
Grouping APIを使用した場合は、GroupingPartitionerが利用されるようです。もしかしたら、通常はGrouping APIを使用すればたいていのケースは事足りるのかもしれません…。
https://github.com/infinispan/infinispan/blob/8.2.1.Final/core/src/main/java/org/infinispan/distribution/ch/impl/KeyPartitionerFactory.java#L40
https://github.com/infinispan/infinispan/blob/8.2.1.Final/core/src/main/java/org/infinispan/distribution/group/impl/GroupingPartitioner.java

また、先ほども紹介しましたが、デフォルトのSegment数は256です。

    it("default segment size is 256.") {
      withCache[String, String]("defaultDistributedCache", 3) { cache =>
        cache.getCacheConfiguration.clustering.hash.numSegments should be(256)
      }
    }

この状態でエントリを登録して、どのようにデータが各Segment、Nodeに配置されるのか見てみましょう。

    it("distributed hash partition.") {
      withCache[String, String]("defaultDistributedCache", 3) { cache =>
        for {
          i <- 1 to 10
          j <- 1 to 5
        } cache.put(s"key-${i}00-$j", s"value-${i}00-$j")

        val consistentHash = cache.getAdvancedCache.getDistributionManager.getConsistentHash
        val dm = cache.getAdvancedCache.getDistributionManager

        cache
          .keySet
          .asScala
          .toArray
          .sorted
          .foreach(key => println(s"key = ${key}, segment = ${consistentHash.getSegment(key)}, primaryOwner = ${dm.getPrimaryLocation(key)}"))
      }
    }

生成するキーおよび値は、「key-xxx-yyy」、「value-xxx-zzz」としています。

結果は、このように。

key = key-100-1, segment = 155, primaryOwner = node-26816
key = key-100-2, segment = 228, primaryOwner = node-26816
key = key-100-3, segment = 148, primaryOwner = node-27761
key = key-100-4, segment = 61, primaryOwner = node-27761
key = key-100-5, segment = 255, primaryOwner = node-26816

key = key-1000-1, segment = 198, primaryOwner = node-27761
key = key-1000-2, segment = 209, primaryOwner = node-26816
key = key-1000-3, segment = 33, primaryOwner = node-2686
key = key-1000-4, segment = 176, primaryOwner = node-2686
key = key-1000-5, segment = 165, primaryOwner = node-2686

key = key-200-1, segment = 203, primaryOwner = node-27761
key = key-200-2, segment = 31, primaryOwner = node-27761
key = key-200-3, segment = 204, primaryOwner = node-2686
key = key-200-4, segment = 177, primaryOwner = node-26816
key = key-200-5, segment = 89, primaryOwner = node-2686

key = key-300-1, segment = 162, primaryOwner = node-26816
key = key-300-2, segment = 78, primaryOwner = node-2686
key = key-300-3, segment = 80, primaryOwner = node-2686
key = key-300-4, segment = 124, primaryOwner = node-26816
key = key-300-5, segment = 244, primaryOwner = node-27761

key = key-400-1, segment = 240, primaryOwner = node-2686
key = key-400-2, segment = 94, primaryOwner = node-2686
key = key-400-3, segment = 253, primaryOwner = node-27761
key = key-400-4, segment = 39, primaryOwner = node-27761
key = key-400-5, segment = 129, primaryOwner = node-26816

key = key-500-1, segment = 47, primaryOwner = node-26816
key = key-500-2, segment = 182, primaryOwner = node-2686
key = key-500-3, segment = 165, primaryOwner = node-2686
key = key-500-4, segment = 94, primaryOwner = node-2686
key = key-500-5, segment = 66, primaryOwner = node-2686

key = key-600-1, segment = 161, primaryOwner = node-2686
key = key-600-2, segment = 36, primaryOwner = node-27761
key = key-600-3, segment = 68, primaryOwner = node-26816
key = key-600-4, segment = 166, primaryOwner = node-2686
key = key-600-5, segment = 10, primaryOwner = node-2686

key = key-700-1, segment = 107, primaryOwner = node-26816
key = key-700-2, segment = 113, primaryOwner = node-26816
key = key-700-3, segment = 248, primaryOwner = node-26816
key = key-700-4, segment = 96, primaryOwner = node-26816
key = key-700-5, segment = 255, primaryOwner = node-26816

key = key-800-1, segment = 31, primaryOwner = node-27761
key = key-800-2, segment = 226, primaryOwner = node-2686
key = key-800-3, segment = 92, primaryOwner = node-26816
key = key-800-4, segment = 108, primaryOwner = node-2686
key = key-800-5, segment = 235, primaryOwner = node-26816

key = key-900-1, segment = 145, primaryOwner = node-2686
key = key-900-2, segment = 226, primaryOwner = node-2686
key = key-900-3, segment = 84, primaryOwner = node-26816
key = key-900-4, segment = 223, primaryOwner = node-2686
key = key-900-5, segment = 141, primaryOwner = node-26816

見やすさのために行を分けましたが、見事にバラバラのSegment、Nodeに配置されていますね。

KeyPartitionerを実装する

前述の例はデフォルト設定なので、データと配置されるSegment(とNode)にあまり関連がありませんでした。

これを、「key-xxx-yyy」、「value-xxx-zzz」の「xxx」の部分で同じSegmentに配置されるように、簡単なKeyPartitionerを実装してみたいと思います。

で、作成したKeyPartitionerがこちら。
src/main/scala/org/littlewings/keypartitioner/MyKeyPartitioner.scala

package org.littlewings.keypartitioner

import org.infinispan.commons.util.Util
import org.infinispan.configuration.cache.HashConfiguration
import org.infinispan.distribution.ch.KeyPartitioner

class MyKeyPartitioner extends KeyPartitioner {
  var configuration: HashConfiguration = _

  override def init(configuration: HashConfiguration): Unit = {
    this.configuration = configuration
  }

  override def getSegment(key: AnyRef): Int =
    key.asInstanceOf[String].split("-")(1).toInt % configuration.numSegments
}

「key-xxx-yyy」の「xxx」の部分だけ抜き出して数字に変換して、Segment数で割った余りを取るだけの簡単なKeyPartitionerです。デフォルト実装よりもはるかに簡易なので、マジメな実装を見たい場合は、デフォルトのHashFunctionPartitionerを確認すればよいかと。

なお、KeyPartitionerを実装するにあたって最低限実装しなくてはいけないのはgetSegmentメソッドなのですが、これだけだとSegmentに関する情報が何もないので、initメソッドをオーバーライドしてHashConfigurationを持つように実装しています。HashFunctionPartitionerも近いことをしています。

initメソッドはKeyPartitionerインターフェースでデフォルト実装が書かれていますが、メソッドの中身は空なので実質はオーバーライドすることになるのではないかなと思います。

それでは、このKeyPartitionerを利用して確認してみましょう。

作成したKeyPartitionerの動作確認

Infinispanの設定ファイルに、Distributed Cacheを追加して、先ほど設定したKeyPartitionerを定義します。

        <distributed-cache name="customPartitionerDistributedCache"
                           key-partitioner="org.littlewings.keypartitioner.MyKeyPartitioner"/>

これで、KeyPartitionerが設定されました。

先ほど、エントリの配置状況を確認したものと、ほぼ同等のコードで確認してみます。
※ほぼ同等、というのは、作成されたKeyPartitionerが設定されているかどうかを確認するコードも入っているからです

      withCache[String, String]("customPartitionerDistributedCache", 3) { cache =>
        cache.getCacheConfiguration.clustering.hash.keyPartitioner should be(a[MyKeyPartitioner])

        for {
          i <- 1 to 10
          j <- 1 to 5
        } cache.put(s"key-${i}00-$j", s"value-${i}00-$j")

        val consistentHash = cache.getAdvancedCache.getDistributionManager.getConsistentHash
        val dm = cache.getAdvancedCache.getDistributionManager

        cache
          .keySet
          .asScala
          .toArray
          .sorted
          .foreach(key => println(s"key = ${key}, segment = ${consistentHash.getSegment(key)}, primaryOwner = ${dm.getPrimaryLocation(key)}"))
      }
    }

結果。

key = key-100-1, segment = 100, primaryOwner = node-56504
key = key-100-2, segment = 100, primaryOwner = node-56504
key = key-100-3, segment = 100, primaryOwner = node-56504
key = key-100-4, segment = 100, primaryOwner = node-56504
key = key-100-5, segment = 100, primaryOwner = node-56504

key = key-1000-1, segment = 232, primaryOwner = node-12128
key = key-1000-2, segment = 232, primaryOwner = node-12128
key = key-1000-3, segment = 232, primaryOwner = node-12128
key = key-1000-4, segment = 232, primaryOwner = node-12128
key = key-1000-5, segment = 232, primaryOwner = node-12128

key = key-200-1, segment = 200, primaryOwner = node-56504
key = key-200-2, segment = 200, primaryOwner = node-56504
key = key-200-3, segment = 200, primaryOwner = node-56504
key = key-200-4, segment = 200, primaryOwner = node-56504
key = key-200-5, segment = 200, primaryOwner = node-56504

key = key-300-1, segment = 44, primaryOwner = node-12128
key = key-300-2, segment = 44, primaryOwner = node-12128
key = key-300-3, segment = 44, primaryOwner = node-12128
key = key-300-4, segment = 44, primaryOwner = node-12128
key = key-300-5, segment = 44, primaryOwner = node-12128

key = key-400-1, segment = 144, primaryOwner = node-21416
key = key-400-2, segment = 144, primaryOwner = node-21416
key = key-400-3, segment = 144, primaryOwner = node-21416
key = key-400-4, segment = 144, primaryOwner = node-21416
key = key-400-5, segment = 144, primaryOwner = node-21416

key = key-500-1, segment = 244, primaryOwner = node-12128
key = key-500-2, segment = 244, primaryOwner = node-12128
key = key-500-3, segment = 244, primaryOwner = node-12128
key = key-500-4, segment = 244, primaryOwner = node-12128
key = key-500-5, segment = 244, primaryOwner = node-12128

key = key-600-1, segment = 88, primaryOwner = node-56504
key = key-600-2, segment = 88, primaryOwner = node-56504
key = key-600-3, segment = 88, primaryOwner = node-56504
key = key-600-4, segment = 88, primaryOwner = node-56504
key = key-600-5, segment = 88, primaryOwner = node-56504

key = key-700-1, segment = 188, primaryOwner = node-56504
key = key-700-2, segment = 188, primaryOwner = node-56504
key = key-700-3, segment = 188, primaryOwner = node-56504
key = key-700-4, segment = 188, primaryOwner = node-56504
key = key-700-5, segment = 188, primaryOwner = node-56504

key = key-800-1, segment = 32, primaryOwner = node-21416
key = key-800-2, segment = 32, primaryOwner = node-21416
key = key-800-3, segment = 32, primaryOwner = node-21416
key = key-800-4, segment = 32, primaryOwner = node-21416
key = key-800-5, segment = 32, primaryOwner = node-21416

key = key-900-1, segment = 132, primaryOwner = node-21416
key = key-900-2, segment = 132, primaryOwner = node-21416
key = key-900-3, segment = 132, primaryOwner = node-21416
key = key-900-4, segment = 132, primaryOwner = node-21416
key = key-900-5, segment = 132, primaryOwner = node-21416

うまくSegmentごとにグルーピングできましたね!OKそうです。

Distributed Streams APIと合わせて使う

データの配置Segmentをコントロールできたわけですが、この機能はDistributed Streams APIと合わせて使うことを意図していたのでした。

というわけで、Distributed Streams APIとも合わせて使ってみます。

実装した内容は、こちら。

    it("Distributed Streams, using custom KeyPartitioner.") {
      withCache[String, String]("customPartitionerDistributedCache", 3) { cache =>
        for {
          i <- 1 to 10
          j <- 1 to 5
        } cache.put(s"key-${i}00-$j", s"value-${i}00-$j")

        val consistentHash = cache.getAdvancedCache.getDistributionManager.getConsistentHash
        val dm = cache.getAdvancedCache.getDistributionManager

        val targetSegments =
          Array("key-100-1", "key-500-1", "key-900-1")
            .map(consistentHash.getSegment)
            .map(Integer.valueOf)
            .toSet

        val stream =
          cache.entrySet.stream.asInstanceOf[CacheStream[java.util.Map.Entry[String, String]]]

        val results =
          try {
            stream
              .filterKeySegments(targetSegments.asJava)
              .map[String](e => e.getValue)
              .collect(CacheCollectors.serializableCollector[String, java.util.List[String]](() => Collectors.toList[String]))
          } finally {
            stream.close()
          }

        results
          .asScala
          .foreach { value =>
            println(s"value = ${value}, segment = ${consistentHash.getSegment(value.replace("value", "key"))}, primaryOwner = ${dm.getPrimaryLocation(value.replace("value", "key"))}")
          }
      }
    }

今回のKeyPartitionerだと「key-xxx-yyy」の「xxx」の部分でグルーピングするので、「xxx」が同じデータは、同じSegmentにマッピングされます。なので、「xxx」の部分でひとつキーを決めると、対象のSegmentを決めることができます。

ここでは、3つのSegmentを選んでみました。ConsistentHash#getSegmentで、キーに対応するSegmentを取得することができます。

        val targetSegments =
          Array("key-100-1", "key-500-1", "key-900-1")
            .map(consistentHash.getSegment)
            .map(Integer.valueOf)
            .toSet

Distributed Streams APIでの適用時には、ここで決めたSegmentのSetをCacheStream#filterKeySegmentsに渡してフィルタリングします。

        val stream =
          cache.entrySet.stream.asInstanceOf[CacheStream[java.util.Map.Entry[String, String]]]

        val results =
          try {
            stream
              .filterKeySegments(targetSegments.asJava)
              .map[String](e => e.getValue)
              .collect(CacheCollectors.serializableCollector[String, java.util.List[String]](() => Collectors.toList[String]))
          } finally {
            stream.close()
          }

Streamを使った処理そのものは、Map.Entryから値を抽出しただけの簡単なものです。

最後は結果を出力しますが、SegmentやNodeの情報も出すために値からキーを戻して(機械的なルールで戻せるので…)確認してみます。

        results
          .asScala
          .foreach { value =>
            println(s"value = ${value}, segment = ${consistentHash.getSegment(value.replace("value", "key"))}, primaryOwner = ${dm.getPrimaryLocation(value.replace("value", "key"))}")

結果は、このように。

value = value-100-1, segment = 100, primaryOwner = ikaruga-ubuntu-41225
value = value-100-4, segment = 100, primaryOwner = ikaruga-ubuntu-41225
value = value-100-5, segment = 100, primaryOwner = ikaruga-ubuntu-41225
value = value-100-2, segment = 100, primaryOwner = ikaruga-ubuntu-41225
value = value-100-3, segment = 100, primaryOwner = ikaruga-ubuntu-41225

value = value-900-1, segment = 132, primaryOwner = ikaruga-ubuntu-5504
value = value-900-4, segment = 132, primaryOwner = ikaruga-ubuntu-5504
value = value-900-5, segment = 132, primaryOwner = ikaruga-ubuntu-5504
value = value-900-2, segment = 132, primaryOwner = ikaruga-ubuntu-5504
value = value-900-3, segment = 132, primaryOwner = ikaruga-ubuntu-5504

value = value-500-4, segment = 244, primaryOwner = ikaruga-ubuntu-5504
value = value-500-5, segment = 244, primaryOwner = ikaruga-ubuntu-5504
value = value-500-2, segment = 244, primaryOwner = ikaruga-ubuntu-5504
value = value-500-3, segment = 244, primaryOwner = ikaruga-ubuntu-5504
value = value-500-1, segment = 244, primaryOwner = ikaruga-ubuntu-5504

指定したSegmentに該当するデータのみが抽出できていますね、うまく指定のSegmentでフィルタリングできたようです。

まとめ

KeyPartitionerを使って、データの配置先のSegmentをコントロールし、Distributed Streams APIでフィルタリングするところまでやってみました。

機能としては面白いのですが、ちゃんと理解していないと特定のSegmentにデータが偏ったりして後悔することになりそうですよね。使う時には、ちゃんと考えましょう、と…。
たいていのケースは、Grouping APIで事足りそうな気もしますけれど、どうなのでしょうね。

個人的には、こういう話は興味がありますが。

今回作成したコードは、こちらに置いています。

https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-key-partitioner