もうすぐ、Hazelcast 3.2がリリースされます。今現在は、3.2 RC-2の段階です。3.2は、Map Reduce APIが追加されるということで、日本でもちょくちょく名前を見かけていましたが、個人的には先行してちょっと試しています。
Map Reduce API自体はまだオフィシャルにドキュメントとしても現れていませんが、手元で動かしている中で「Data Localityってどうやって実現してるんだっけ?」というのがふと疑問に。
結論としては、「com.hazelcast.map.RecordStore」というインターフェースの実装、「com.hazelcast.map.DefaultRecordStore」というクラスを使用して、Nodeにローカルなデータを参照しているようです。
ただ、これらのクラスはHazelcastの公開されているJavadocには載っていない、内部のAPIになります。
ですので、本エントリの内容はHazelcastの内部APIを使用しているもののため、公開情報ではありませんし、今後使用できなくなる可能性が十分にありますので、その点についてはご理解ください。
以前、似たようなことをやったエントリとして、こういうのを書きました。
HazelcastのDistibuted Mapにおける、キーの分散状況を確認する
http://d.hatena.ne.jp/Kazuhira/20131122/1385129698
これと同じようなことを、RecordStoreを使って書いてみます。
とりあえず、準備。
build.sbt
name := "hazelcast-key-distribution" version := "0.0.1-SNAPSHOT" scalaVersion := "2.10.4" organization := "org.littlewings" scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked") libraryDependencies ++= Seq( "com.hazelcast" % "hazelcast" % "3.1.6", "org.scalatest" %% "scalatest" % "2.0" % "test" )
まだオフィシャルサイトからのダウンロードはできませんが、Scalaは2.10.4を使用。Maven Central Repositoryにはアップされていますので。
それでは、動作確認の雛形コード。
src/test/scala/org/littlewings/hazelcast/keydist/KeyDistributionSpec.scala
package org.littlewings.hazelcast.keydist import scala.collection.JavaConverters._ import com.hazelcast.core.{Hazelcast, HazelcastInstance} import com.hazelcast.instance.{HazelcastInstanceImpl, HazelcastInstanceProxy} import com.hazelcast.map.{DefaultRecordStore, MapService, RecordStore} import com.hazelcast.nio.serialization.{Data, SerializationConstants} import com.hazelcast.spi.impl.NodeEngineImpl import org.scalatest.FunSpec import org.scalatest.Matchers._ class KeyDistributionSpec extends FunSpec { describe("key distribution spec") { it("range key, entry test") { withHazelcast(2) { hazelcast => // ここに、HazelcastInstanceを使ったコードを書く } } def withHazelcast(instanceNumber: Int)(fun: HazelcastInstance => Unit): Unit = { val instances = (1 to instanceNumber).map(i => Hazelcast.newHazelcastInstance) try { fun(instances.head) } finally { Hazelcast.shutdownAll() } } }
withHazelcastメソッドで、HazelcastのNodeを指定数起動しつつ、うちひとつのHazelcastInstanceを使ってテストを行います。
def withHazelcast(instanceNumber: Int)(fun: HazelcastInstance => Unit): Unit = { val instances = (1 to instanceNumber).map(i => Hazelcast.newHazelcastInstance) try { fun(instances.head) } finally { Hazelcast.shutdownAll() } }
importしてあるこの部分、全部非公開APIです。
import com.hazelcast.instance.{HazelcastInstanceImpl, HazelcastInstanceProxy} import com.hazelcast.map.{DefaultRecordStore, MapService, RecordStore} import com.hazelcast.spi.impl.NodeEngineImpl
このNodeEngineインターフェース(の実装が、NodeEngineImplクラス)がHazelcastの内部APIで中心のクラスになっていて、こちらを使用するのですが、通常のAPIでは取得することができません。
仕方がないので、ここはひとつリフレクションで…。
private def getNodeEngineImpl(hazelcast: HazelcastInstance): NodeEngineImpl = { val method = classOf[HazelcastInstanceProxy].getDeclaredMethod("getOriginal") method.setAccessible(true) val original = method.invoke(hazelcast).asInstanceOf[HazelcastInstanceImpl] original.node.nodeEngine }
では、withHazelcastメソッドの中に、処理を書いていきます。
まずは、データ登録。
val map = hazelcast.getMap[String, String]("default") val entryRange = 1 to 20 val entries = entryRange.map(i => s"key$i" -> s"value$i").toMap entries.foreach { case (k, v) => map.put(k, v) }
続いて、自身のNodeが属するPartitionを取得しておきます。
val selfNode = hazelcast.getCluster.getLocalMember val partitionService = hazelcast.getPartitionService val selfPartitions = partitionService .getPartitions .asScala .withFilter(_.getOwner == selfNode)
そして、NodeEngineImplや関連するクラスの取得。
val nodeEngine = getNodeEngineImpl(hazelcast) val clusterService = nodeEngine.getClusterService val mapService: MapService = nodeEngine.getService(MapService.SERVICE_NAME) val serializationService = nodeEngine.getSerializationService
ClusterServiceは不要でしたが…。
MapServiceは、非公開APIですね。また、後の処理で使用するためにSerializationServiceを取得しています。
SerializationServiceは公開APIですが、HazelcastClientを使用していない場合は、取得する方法って公開されていない気がします。
では、自Nodeが属するパーティションに含まれるデータを、表示してみましょう。こんなコードを書いてみました。
selfPartitions .map { partition => (partition, mapService.getRecordStore(partition.getPartitionId, "default")) } .withFilter(!_._2.isEmpty) .foreach { case (partition, recordStore) => println(s"""|PartitionId[${partition.getPartitionId}] | Owner = ${partition.getOwner} | RecordStore: |${recordStore.entrySetObject.asScala.mkString(" ", System.lineSeparator + " ", "")} |""".stripMargin) }
実行結果。
> test 〜省略〜 PartitionId[69] Owner = Member [192.168.129.129]:5701 this RecordStore: Data{type=-11, partitionHash=-524753087, bufferSize=11, totalSize=27}=value5 PartitionId[18] Owner = Member [192.168.129.129]:5701 this RecordStore: Data{type=-11, partitionHash=1435012226, bufferSize=12, totalSize=28}=value15 PartitionId[99] Owner = Member [192.168.129.129]:5701 this RecordStore: Data{type=-11, partitionHash=-1285765290, bufferSize=12, totalSize=28}=value19 〜省略〜
と、こんな感じでパーティション内に保持してあるデータが表示されます。
MapService#getRecordStoreを使用することで、PartitionとDistributed Mapの名前が分かっていれば、そのPartitionに格納してあるデータを取得することができます。
mapService.getRecordStore(partition.getPartitionId, "default")
あとは、RecordStoreが保持しているデータを取得するという感じですね。
recordStore.entrySetObject
ただ、格納しているキーと値がそのままの形式で取得できるわけではなく、シリアライズされているのでこれを元に戻すためには、SerializationServiceを使用します。
続いて、今度は既知のキーからPartitionを引き、ここからデータを取得してみます。
entries.foreach { case (key, value) => val partition = partitionService.getPartition(key) val recordStore: RecordStore = mapService.getRecordStore(partition.getPartitionId, "default") val keyData = serializationService.toData(key) val valueData = recordStore.get(keyData) serializationService.toObject(keyData) should be (key) serializationService.toObject(valueData.asInstanceOf[Data]) should be (value) }
キーが分かればPartitionServiceからPartitionが取得できるので、このPartitionとMapServiceから、やはりRecordStoreを取得します。
先の通り、RecordStoreの中に格納されているキーと値はシリアライズされているので、これを戻すためにSerializationServiceを使用します。また、RecordStoreに格納されている値を引くためには、キーをシリアライズする必要があるので、こちらも合わせて行っています。
ちょっと内部APIに踏み込んだものになりましたが、データの分散状態を別のアプローチで確認してみました。また、シリアライズのAPIの使い方も垣間見れた感じですね。
今回書いたコードは、こちらにアップしています。
https://github.com/kazuhira-r/hazelcast-examples/tree/master/hazelcast-key-distribution