InfinispanのDistributed Streams APIのドキュメントを読んでいて、Segment based filteringという機能が増えていたことに気付きました。
Javadocを見ると、Infinispan 8.2からの機能みたいですね、気付いていませんでした。
KeyPartitioner (Infinispan JavaDoc 8.2.11.Final API)
どういう機能かというと…。
まず、使える対象はReplicated CacheとDistributed Cacheであることが前提となります。この機能を使うことで、特定のデータのサブセット上での操作を可能にします。これはKeyPartitionerにより決定されるもので、CacheStreamのfilterKeySegmentsメソッドを利用することで、Segmentをフィルタリングすることができるようになります。適用タイミングはキーによるフィルタの後ですが、中間操作が実行される前になります。
要するに、KeyPartitionerを使用してエントリの配置先Segmentをコントロールできるようになるということと、Distributed Streams APIを使う時にSegmentでフィルタリングをかけて対象のSegmentとデータをコントロールできる、ということみたいです。
近いような話としてGrouping APIがありますが、こちらは配置先のNodeのコントロールになります。
※とはいえ、実装を見ているとGrouping APIもKeyPartitionerになったような?
KeyPartitionerの場合は、もうひとつ小さい単位、Segmentでのコントロールです。
Segmentというのはこちらを見るとよいですが、クラスタ内を分割した領域で、この中にエントリが配置されます。
複数のSegmentをNodeが保持するわけですが、各NodeがSegmentのOwnerという扱いになります。また、最初にSegmentが割り当てられたNodeがPrimaryOwnerと呼ばれます。その他のNodeは、バックアップというわけですね。
クラスタ内でのNodeに増減があった際にデータが移動しますが、その単位はこのSegment単位となります。デフォルトのSegment数は256です。
各Nodeへの配置方法を決めるのが、ConsistentHashFactoryとなります。
少し話がそれましたが、KeyPartitionerに話を戻します。ドキュメントによると、この機能は高度な機能であり、InfinispanのSegmentとハッシュに関する深い知識が必要だということです。Segmentベースのフィルタリングは、Segmentごとにデータを扱って処理を行いたい場合に役立つと考えられているようで、Apache Sparkなどと統合に便利なのだとか。
というわけで、使いこなすのは難しそうですが、とりあえず挙動をみてみましょう。
準備
ビルド定義から。
build.sbt
name := "embedded-key-partitioner" version := "0.0.1-SNAPSHOT" organization := "org.littlewings" scalaVersion := "2.11.8" updateOptions := updateOptions.value.withCachedResolution(true) scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature", "-Xexperimental") parallelExecution in Test := false libraryDependencies ++= Seq( "org.infinispan" % "infinispan-core" % "8.2.1.Final", "net.jcip" % "jcip-annotations" % "1.0" % "provided", "org.scalatest" %% "scalatest" % "2.2.6" % "test" )
動作確認は、テストコードで行います。
Infinispanの設定は大枠こんな感じですが、「あとで」と書いているところにはDistributed Cacheを定義します。そちらは後ほど。
src/test/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:8.2 http://www.infinispan.org/schemas/infinispan-config-8.2.xsd" xmlns="urn:infinispan:config:8.2"> <jgroups> <stack-file name="udp" path="jgroups.xml"/> </jgroups> <cache-container default-cache="localCache"> <jmx duplicate-domains="true"/> <transport cluster="cluster" stack="udp"/> <local-cache name="localCache"/> <!-- あとで --> </cache-container> </infinispan>
JGroupsの設定は、端折ります。
テストコードの雛形
先に、動作確認のためのテストコードの雛形を定義します。
src/test/scala/org/littlewings/keypartitioner/KeyPartitionerSpec.scala
package org.littlewings.keypartitioner import java.util.stream.Collectors import org.infinispan.{Cache, CacheStream} import org.infinispan.distribution.ch.impl.HashFunctionPartitioner import org.infinispan.manager.DefaultCacheManager import org.infinispan.stream.CacheCollectors import org.scalatest.{FunSpec, Matchers} import scala.collection.JavaConverters._ class KeyPartitionerSpec extends FunSpec with Matchers { describe("Infinispan Key Partitioner") { // ここに、テストを書く! } protected def withCache[K, V](cacheName: String, numInstances: Int = 1)(fun: Cache[K, V] => Unit): Unit = { val managers = (1 to numInstances).map(_ => new DefaultCacheManager("infinispan.xml")) managers.foreach(_.getCache[K, V](cacheName)) try { val cache = managers(0).getCache[K, V](cacheName) fun(cache) } finally { managers.foreach(_.getCache[K, V](cacheName).stop()) managers.foreach(_.stop()) } } }
簡易的にクラスタを構成する、ヘルパーメソッド付き。
デフォルト設定の確認
自分でKeyPartitionerを作成する前に、デフォルトの実装を確認してみます。
Infinispanの設定ファイルには、デフォルト状態のDistributed Cacheを定義します。
<distributed-cache name="defaultDistributedCache"/>
KeyPartitionerは、定義しないと利用されない、ではなくデフォルト実装があります。HashFunctionPartitionerというクラスらしいです。
it("default KeyPartitioner is HashFunctionPartitioner.") { withCache[String, String]("defaultDistributedCache", 3) { cache => cache.getCacheConfiguration.clustering.hash.keyPartitioner should be(a[HashFunctionPartitioner]) } }
このKeyPartitionerの実装は、Segment数をもとに算出した値で、配置Segmentを決定します。単にSegment数というだけではなくて、いろいろ計算が入っていますが。
https://github.com/infinispan/infinispan/blob/8.2.1.Final/core/src/main/java/org/infinispan/distribution/ch/impl/HashFunctionPartitioner.java#L45
https://github.com/infinispan/infinispan/blob/8.2.1.Final/core/src/main/java/org/infinispan/distribution/ch/impl/HashFunctionPartitioner.java#L51
https://github.com/infinispan/infinispan/blob/8.2.1.Final/commons/src/main/java/org/infinispan/commons/util/Util.java#L751-L753
追記)
Grouping APIを使用した場合は、GroupingPartitionerが利用されるようです。もしかしたら、通常はGrouping APIを使用すればたいていのケースは事足りるのかもしれません…。
https://github.com/infinispan/infinispan/blob/8.2.1.Final/core/src/main/java/org/infinispan/distribution/ch/impl/KeyPartitionerFactory.java#L40
https://github.com/infinispan/infinispan/blob/8.2.1.Final/core/src/main/java/org/infinispan/distribution/group/impl/GroupingPartitioner.java
また、先ほども紹介しましたが、デフォルトのSegment数は256です。
it("default segment size is 256.") { withCache[String, String]("defaultDistributedCache", 3) { cache => cache.getCacheConfiguration.clustering.hash.numSegments should be(256) } }
この状態でエントリを登録して、どのようにデータが各Segment、Nodeに配置されるのか見てみましょう。
it("distributed hash partition.") { withCache[String, String]("defaultDistributedCache", 3) { cache => for { i <- 1 to 10 j <- 1 to 5 } cache.put(s"key-${i}00-$j", s"value-${i}00-$j") val consistentHash = cache.getAdvancedCache.getDistributionManager.getConsistentHash val dm = cache.getAdvancedCache.getDistributionManager cache .keySet .asScala .toArray .sorted .foreach(key => println(s"key = ${key}, segment = ${consistentHash.getSegment(key)}, primaryOwner = ${dm.getPrimaryLocation(key)}")) } }
生成するキーおよび値は、「key-xxx-yyy」、「value-xxx-zzz」としています。
結果は、このように。
key = key-100-1, segment = 155, primaryOwner = node-26816 key = key-100-2, segment = 228, primaryOwner = node-26816 key = key-100-3, segment = 148, primaryOwner = node-27761 key = key-100-4, segment = 61, primaryOwner = node-27761 key = key-100-5, segment = 255, primaryOwner = node-26816 key = key-1000-1, segment = 198, primaryOwner = node-27761 key = key-1000-2, segment = 209, primaryOwner = node-26816 key = key-1000-3, segment = 33, primaryOwner = node-2686 key = key-1000-4, segment = 176, primaryOwner = node-2686 key = key-1000-5, segment = 165, primaryOwner = node-2686 key = key-200-1, segment = 203, primaryOwner = node-27761 key = key-200-2, segment = 31, primaryOwner = node-27761 key = key-200-3, segment = 204, primaryOwner = node-2686 key = key-200-4, segment = 177, primaryOwner = node-26816 key = key-200-5, segment = 89, primaryOwner = node-2686 key = key-300-1, segment = 162, primaryOwner = node-26816 key = key-300-2, segment = 78, primaryOwner = node-2686 key = key-300-3, segment = 80, primaryOwner = node-2686 key = key-300-4, segment = 124, primaryOwner = node-26816 key = key-300-5, segment = 244, primaryOwner = node-27761 key = key-400-1, segment = 240, primaryOwner = node-2686 key = key-400-2, segment = 94, primaryOwner = node-2686 key = key-400-3, segment = 253, primaryOwner = node-27761 key = key-400-4, segment = 39, primaryOwner = node-27761 key = key-400-5, segment = 129, primaryOwner = node-26816 key = key-500-1, segment = 47, primaryOwner = node-26816 key = key-500-2, segment = 182, primaryOwner = node-2686 key = key-500-3, segment = 165, primaryOwner = node-2686 key = key-500-4, segment = 94, primaryOwner = node-2686 key = key-500-5, segment = 66, primaryOwner = node-2686 key = key-600-1, segment = 161, primaryOwner = node-2686 key = key-600-2, segment = 36, primaryOwner = node-27761 key = key-600-3, segment = 68, primaryOwner = node-26816 key = key-600-4, segment = 166, primaryOwner = node-2686 key = key-600-5, segment = 10, primaryOwner = node-2686 key = key-700-1, segment = 107, primaryOwner = node-26816 key = key-700-2, segment = 113, primaryOwner = node-26816 key = key-700-3, segment = 248, primaryOwner = node-26816 key = key-700-4, segment = 96, primaryOwner = node-26816 key = key-700-5, segment = 255, primaryOwner = node-26816 key = key-800-1, segment = 31, primaryOwner = node-27761 key = key-800-2, segment = 226, primaryOwner = node-2686 key = key-800-3, segment = 92, primaryOwner = node-26816 key = key-800-4, segment = 108, primaryOwner = node-2686 key = key-800-5, segment = 235, primaryOwner = node-26816 key = key-900-1, segment = 145, primaryOwner = node-2686 key = key-900-2, segment = 226, primaryOwner = node-2686 key = key-900-3, segment = 84, primaryOwner = node-26816 key = key-900-4, segment = 223, primaryOwner = node-2686 key = key-900-5, segment = 141, primaryOwner = node-26816
見やすさのために行を分けましたが、見事にバラバラのSegment、Nodeに配置されていますね。
KeyPartitionerを実装する
前述の例はデフォルト設定なので、データと配置されるSegment(とNode)にあまり関連がありませんでした。
これを、「key-xxx-yyy」、「value-xxx-zzz」の「xxx」の部分で同じSegmentに配置されるように、簡単なKeyPartitionerを実装してみたいと思います。
で、作成したKeyPartitionerがこちら。
src/main/scala/org/littlewings/keypartitioner/MyKeyPartitioner.scala
package org.littlewings.keypartitioner import org.infinispan.commons.util.Util import org.infinispan.configuration.cache.HashConfiguration import org.infinispan.distribution.ch.KeyPartitioner class MyKeyPartitioner extends KeyPartitioner { var configuration: HashConfiguration = _ override def init(configuration: HashConfiguration): Unit = { this.configuration = configuration } override def getSegment(key: AnyRef): Int = key.asInstanceOf[String].split("-")(1).toInt % configuration.numSegments }
「key-xxx-yyy」の「xxx」の部分だけ抜き出して数字に変換して、Segment数で割った余りを取るだけの簡単なKeyPartitionerです。デフォルト実装よりもはるかに簡易なので、マジメな実装を見たい場合は、デフォルトのHashFunctionPartitionerを確認すればよいかと。
なお、KeyPartitionerを実装するにあたって最低限実装しなくてはいけないのはgetSegmentメソッドなのですが、これだけだとSegmentに関する情報が何もないので、initメソッドをオーバーライドしてHashConfigurationを持つように実装しています。HashFunctionPartitionerも近いことをしています。
initメソッドはKeyPartitionerインターフェースでデフォルト実装が書かれていますが、メソッドの中身は空なので実質はオーバーライドすることになるのではないかなと思います。
それでは、このKeyPartitionerを利用して確認してみましょう。
作成したKeyPartitionerの動作確認
Infinispanの設定ファイルに、Distributed Cacheを追加して、先ほど設定したKeyPartitionerを定義します。
<distributed-cache name="customPartitionerDistributedCache" key-partitioner="org.littlewings.keypartitioner.MyKeyPartitioner"/>
これで、KeyPartitionerが設定されました。
先ほど、エントリの配置状況を確認したものと、ほぼ同等のコードで確認してみます。
※ほぼ同等、というのは、作成されたKeyPartitionerが設定されているかどうかを確認するコードも入っているからです
withCache[String, String]("customPartitionerDistributedCache", 3) { cache => cache.getCacheConfiguration.clustering.hash.keyPartitioner should be(a[MyKeyPartitioner]) for { i <- 1 to 10 j <- 1 to 5 } cache.put(s"key-${i}00-$j", s"value-${i}00-$j") val consistentHash = cache.getAdvancedCache.getDistributionManager.getConsistentHash val dm = cache.getAdvancedCache.getDistributionManager cache .keySet .asScala .toArray .sorted .foreach(key => println(s"key = ${key}, segment = ${consistentHash.getSegment(key)}, primaryOwner = ${dm.getPrimaryLocation(key)}")) } }
結果。
key = key-100-1, segment = 100, primaryOwner = node-56504 key = key-100-2, segment = 100, primaryOwner = node-56504 key = key-100-3, segment = 100, primaryOwner = node-56504 key = key-100-4, segment = 100, primaryOwner = node-56504 key = key-100-5, segment = 100, primaryOwner = node-56504 key = key-1000-1, segment = 232, primaryOwner = node-12128 key = key-1000-2, segment = 232, primaryOwner = node-12128 key = key-1000-3, segment = 232, primaryOwner = node-12128 key = key-1000-4, segment = 232, primaryOwner = node-12128 key = key-1000-5, segment = 232, primaryOwner = node-12128 key = key-200-1, segment = 200, primaryOwner = node-56504 key = key-200-2, segment = 200, primaryOwner = node-56504 key = key-200-3, segment = 200, primaryOwner = node-56504 key = key-200-4, segment = 200, primaryOwner = node-56504 key = key-200-5, segment = 200, primaryOwner = node-56504 key = key-300-1, segment = 44, primaryOwner = node-12128 key = key-300-2, segment = 44, primaryOwner = node-12128 key = key-300-3, segment = 44, primaryOwner = node-12128 key = key-300-4, segment = 44, primaryOwner = node-12128 key = key-300-5, segment = 44, primaryOwner = node-12128 key = key-400-1, segment = 144, primaryOwner = node-21416 key = key-400-2, segment = 144, primaryOwner = node-21416 key = key-400-3, segment = 144, primaryOwner = node-21416 key = key-400-4, segment = 144, primaryOwner = node-21416 key = key-400-5, segment = 144, primaryOwner = node-21416 key = key-500-1, segment = 244, primaryOwner = node-12128 key = key-500-2, segment = 244, primaryOwner = node-12128 key = key-500-3, segment = 244, primaryOwner = node-12128 key = key-500-4, segment = 244, primaryOwner = node-12128 key = key-500-5, segment = 244, primaryOwner = node-12128 key = key-600-1, segment = 88, primaryOwner = node-56504 key = key-600-2, segment = 88, primaryOwner = node-56504 key = key-600-3, segment = 88, primaryOwner = node-56504 key = key-600-4, segment = 88, primaryOwner = node-56504 key = key-600-5, segment = 88, primaryOwner = node-56504 key = key-700-1, segment = 188, primaryOwner = node-56504 key = key-700-2, segment = 188, primaryOwner = node-56504 key = key-700-3, segment = 188, primaryOwner = node-56504 key = key-700-4, segment = 188, primaryOwner = node-56504 key = key-700-5, segment = 188, primaryOwner = node-56504 key = key-800-1, segment = 32, primaryOwner = node-21416 key = key-800-2, segment = 32, primaryOwner = node-21416 key = key-800-3, segment = 32, primaryOwner = node-21416 key = key-800-4, segment = 32, primaryOwner = node-21416 key = key-800-5, segment = 32, primaryOwner = node-21416 key = key-900-1, segment = 132, primaryOwner = node-21416 key = key-900-2, segment = 132, primaryOwner = node-21416 key = key-900-3, segment = 132, primaryOwner = node-21416 key = key-900-4, segment = 132, primaryOwner = node-21416 key = key-900-5, segment = 132, primaryOwner = node-21416
うまくSegmentごとにグルーピングできましたね!OKそうです。
Distributed Streams APIと合わせて使う
データの配置Segmentをコントロールできたわけですが、この機能はDistributed Streams APIと合わせて使うことを意図していたのでした。
というわけで、Distributed Streams APIとも合わせて使ってみます。
実装した内容は、こちら。
it("Distributed Streams, using custom KeyPartitioner.") { withCache[String, String]("customPartitionerDistributedCache", 3) { cache => for { i <- 1 to 10 j <- 1 to 5 } cache.put(s"key-${i}00-$j", s"value-${i}00-$j") val consistentHash = cache.getAdvancedCache.getDistributionManager.getConsistentHash val dm = cache.getAdvancedCache.getDistributionManager val targetSegments = Array("key-100-1", "key-500-1", "key-900-1") .map(consistentHash.getSegment) .map(Integer.valueOf) .toSet val stream = cache.entrySet.stream.asInstanceOf[CacheStream[java.util.Map.Entry[String, String]]] val results = try { stream .filterKeySegments(targetSegments.asJava) .map[String](e => e.getValue) .collect(CacheCollectors.serializableCollector[String, java.util.List[String]](() => Collectors.toList[String])) } finally { stream.close() } results .asScala .foreach { value => println(s"value = ${value}, segment = ${consistentHash.getSegment(value.replace("value", "key"))}, primaryOwner = ${dm.getPrimaryLocation(value.replace("value", "key"))}") } } }
今回のKeyPartitionerだと「key-xxx-yyy」の「xxx」の部分でグルーピングするので、「xxx」が同じデータは、同じSegmentにマッピングされます。なので、「xxx」の部分でひとつキーを決めると、対象のSegmentを決めることができます。
ここでは、3つのSegmentを選んでみました。ConsistentHash#getSegmentで、キーに対応するSegmentを取得することができます。
val targetSegments = Array("key-100-1", "key-500-1", "key-900-1") .map(consistentHash.getSegment) .map(Integer.valueOf) .toSet
Distributed Streams APIでの適用時には、ここで決めたSegmentのSetをCacheStream#filterKeySegmentsに渡してフィルタリングします。
val stream = cache.entrySet.stream.asInstanceOf[CacheStream[java.util.Map.Entry[String, String]]] val results = try { stream .filterKeySegments(targetSegments.asJava) .map[String](e => e.getValue) .collect(CacheCollectors.serializableCollector[String, java.util.List[String]](() => Collectors.toList[String])) } finally { stream.close() }
Streamを使った処理そのものは、Map.Entryから値を抽出しただけの簡単なものです。
最後は結果を出力しますが、SegmentやNodeの情報も出すために値からキーを戻して(機械的なルールで戻せるので…)確認してみます。
results .asScala .foreach { value => println(s"value = ${value}, segment = ${consistentHash.getSegment(value.replace("value", "key"))}, primaryOwner = ${dm.getPrimaryLocation(value.replace("value", "key"))}")
結果は、このように。
value = value-100-1, segment = 100, primaryOwner = node-41225 value = value-100-4, segment = 100, primaryOwner = node-41225 value = value-100-5, segment = 100, primaryOwner = node-41225 value = value-100-2, segment = 100, primaryOwner = node-41225 value = value-100-3, segment = 100, primaryOwner = node-41225 value = value-900-1, segment = 132, primaryOwner = node-5504 value = value-900-4, segment = 132, primaryOwner = node-5504 value = value-900-5, segment = 132, primaryOwner = node-5504 value = value-900-2, segment = 132, primaryOwner = node-5504 value = value-900-3, segment = 132, primaryOwner = node-5504 value = value-500-4, segment = 244, primaryOwner = node-5504 value = value-500-5, segment = 244, primaryOwner = node-5504 value = value-500-2, segment = 244, primaryOwner = node-5504 value = value-500-3, segment = 244, primaryOwner = node-5504 value = value-500-1, segment = 244, primaryOwner = node-5504
指定したSegmentに該当するデータのみが抽出できていますね、うまく指定のSegmentでフィルタリングできたようです。
まとめ
KeyPartitionerを使って、データの配置先のSegmentをコントロールし、Distributed Streams APIでフィルタリングするところまでやってみました。
機能としては面白いのですが、ちゃんと理解していないと特定のSegmentにデータが偏ったりして後悔することになりそうですよね。使う時には、ちゃんと考えましょう、と…。
たいていのケースは、Grouping APIで事足りそうな気もしますけれど、どうなのでしょうね。
個人的には、こういう話は興味がありますが。
今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-key-partitioner