CLOVER🍀

That was when it all began.

HazelcastのData Affinityを試してみる

2014/4/26 追記)
コメントでDistributed Executor Serviceの使い方について指摘をいただいたので、修正しました。

この手の分散データ配置を行うソフトウェアでの、Data Affinityについての利用方法を、Hazelcastで。

Data Affinity
http://www.hazelcast.org/docs/latest/manual/html-single/hazelcast-documentation.html#data-affinity

Data Affinity自体はなにかというと、「関連するデータは同じNodeに置きましょう」というお話で。

Hazelcastでは、ひとつのHazelcastInstanceから

Config cfg = new Config();
HazelcastInstance instance = Hazelcast.newHazelcastInstance(cfg);

複数のDistributed Mapを取得して

Map mapa = instance.getMap("mapa");
Map mapb = instance.getMap("mapb");
Map mapc = instance.getMap("mapc");

データを更新、削除したり

mapa.put("key1", value);
mapb.get("key1");
mapc.remove("key1");

ロックを取ったり、分散処理を行う時

instance.getLock ("key1").lock();

instance.getExecutorService().executeOnKeyOwner(runnable, "key1");

キーが同じであれば、同じNodeにデータを置いてくれるようです。

なんですけど、キーが違うものについては関連するデータであってもバラバラに配置されてしまいます。となると、関連するデータに対して処理を行う時に、ネットワークアクセスが発生しちゃう(他のNodeに取りにいったり保存しなければならない)ことになります。

この手の話でよく例に挙げられるのが、顧客データ(顧客IDとかをキー)と関連データで、それをひとまとめに処理したいみたいな話。

つまり、Executorなどの分散処理を実行する時に効果がある話ですね。

実装方法

これを実現するには、Hazelcastに登録するキー自体を少し変えて、PartitionAwareというインタフェースを実装する必要があります。インターフェースは、こんな感じ。

public interface PartitionAware<T> {
    T getPartitionKey();
}

getPartitionKeyメソッドで返却する値を、関連するデータで同じものを返してあげればOKです。

なお、分散処理を実装する時は、作成するRunnableやCallableにもPartitionAwareインターフェースを実装させることになります。

それでは、使ってみましょう。

準備

まずは、依存関係の定義。
build.sbt

name := "hazelcast-data-affinity"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.0"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked")

incOptions := incOptions.value.withNameHashing(true)

parallelExecution in Test := false

libraryDependencies ++= Seq(
  "com.hazelcast" % "hazelcast" % "3.2",
  "org.scalatest" %% "scalatest" % "2.1.3" % "test"
)

関係ないですけど、ブログ上は初めてのScala 2.11です!

テストコードの雛形は、こんな感じ。
src/test/scala/org/littlewings/hazelcast/affinity/DataAffinitySpec.scala

package org.littlewings.hazelcast.affinity

import com.hazelcast.core.{Hazelcast, HazelcastInstance, Partition}

import org.scalatest.FunSpec
import org.scalatest.Matchers._

class DataAffinitySpec extends FunSpec {
  // ここに、テストを書く!

  private def withHazelcast(instanceNumber: Int)(fun: HazelcastInstance => Unit): Unit = {
    val instances = (1 to instanceNumber).map(_ => Hazelcast.newHazelcastInstance)

    try {
      fun(instances.head)
    } finally {
      instances.foreach(_.getLifecycleService.shutdown())

      Hazelcast.shutdownAll()
    }
  }
}

N個のHazelcastInstanceを起動させる、お手軽クラスタテスト。

キーを定義する

Hazelcastのドキュメントのサンプルでは、顧客と注文を紐付けるようなキーを紹介していますが、こちらはもうちょい簡単に。
src/main/scala/org/littlewings/hazelcast/affinity/AffinityKey.scala

package org.littlewings.hazelcast.affinity

import com.hazelcast.core.PartitionAware

@SerialVersionUID(1L)
final class AffinityKey(val key: String) extends Serializable
                                         with PartitionAware[String] {
  override def getPartitionKey: String =
    key.substring(0, key.indexOf('-'))
}

キーの形式としては、

key[M]-[N]

を想定していて、「key[M]」の部分でグルーピングしようという発想です。

つまり、こういうイメージ。

  describe("Affinity Key") {
    it("prefix key") {
      new AffinityKey("key1-1").getPartitionKey should be ("key1")
      new AffinityKey("key1-2").getPartitionKey should be ("key1")
      new AffinityKey("key2-1").getPartitionKey should be ("key2")
      new AffinityKey("key2-2").getPartitionKey should be ("key2")
    }
  }

はい。

配置をまとめられるか、確認する

それでは、作成したキーを使用すれば、データの配置を本当にまとめられるか、確認してみます。

まずは、作成したキーを使わないコード。

    it("no affinity key") {
      val instanceNumber = 4

      withHazelcast(instanceNumber) { hazelcast =>
        val partitionService = hazelcast.getPartitionService

        val map1 = hazelcast.getMap[String, Integer]("map1")
        val map2 = hazelcast.getMap[String, Integer]("map2")
        val map3 = hazelcast.getMap[String, Integer]("map3")

        val keysValues =
          for {
            i <- 1 to 5
            j <- 1 to 5
          } yield (s"key$i-$j", i * j)

        keysValues.foreach { case (key, value) =>
          Array(map1, map2, map3).foreach(_.put(key, value))
        }

        keysValues.foreach { case (key, _) =>
          println(s"[No Affinity Key] key = $key, owner = ${partitionService.getPartition(key).getOwner}")
        }
      }

コンソールに出力される、配置結果はこんな感じ。

[No Affinity Key] key = key1-1, owner = Member [192.168.129.129]:5704
[No Affinity Key] key = key1-2, owner = Member [192.168.129.129]:5704
[No Affinity Key] key = key1-3, owner = Member [192.168.129.129]:5704
[No Affinity Key] key = key1-4, owner = Member [192.168.129.129]:5704
[No Affinity Key] key = key1-5, owner = Member [192.168.129.129]:5702

[No Affinity Key] key = key2-1, owner = Member [192.168.129.129]:5701 this
[No Affinity Key] key = key2-2, owner = Member [192.168.129.129]:5703
[No Affinity Key] key = key2-3, owner = Member [192.168.129.129]:5703
[No Affinity Key] key = key2-4, owner = Member [192.168.129.129]:5701 this
[No Affinity Key] key = key2-5, owner = Member [192.168.129.129]:5701 this

[No Affinity Key] key = key3-1, owner = Member [192.168.129.129]:5703
[No Affinity Key] key = key3-2, owner = Member [192.168.129.129]:5701 this
[No Affinity Key] key = key3-3, owner = Member [192.168.129.129]:5702
[No Affinity Key] key = key3-4, owner = Member [192.168.129.129]:5702
[No Affinity Key] key = key3-5, owner = Member [192.168.129.129]:5703

[No Affinity Key] key = key4-1, owner = Member [192.168.129.129]:5704
[No Affinity Key] key = key4-2, owner = Member [192.168.129.129]:5704
[No Affinity Key] key = key4-3, owner = Member [192.168.129.129]:5703
[No Affinity Key] key = key4-4, owner = Member [192.168.129.129]:5701 this
[No Affinity Key] key = key4-5, owner = Member [192.168.129.129]:5701 this

[No Affinity Key] key = key5-1, owner = Member [192.168.129.129]:5702
[No Affinity Key] key = key5-2, owner = Member [192.168.129.129]:5704
[No Affinity Key] key = key5-3, owner = Member [192.168.129.129]:5703
[No Affinity Key] key = key5-4, owner = Member [192.168.129.129]:5704
[No Affinity Key] key = key5-5, owner = Member [192.168.129.129]:5703

*あとで改行を入れました

まあ、バラバラですね。

これに対して、作成したキーを適用したコード。

    it("affinity key") {
      val instanceNumber = 4

      withHazelcast(instanceNumber) { hazelcast =>
        val partitionService = hazelcast.getPartitionService

        val map1 = hazelcast.getMap[AffinityKey, Integer]("map1")
        val map2 = hazelcast.getMap[AffinityKey, Integer]("map2")
        val map3 = hazelcast.getMap[AffinityKey, Integer]("map3")

        val keysValues =
          for {
            i <- 1 to 5
            j <- 1 to 5
          } yield (s"key$i-$j", i * j)

        keysValues.foreach { case (key, value) =>
          Array(map1, map2, map3).foreach(_.put(new AffinityKey(key), value))
        }

        keysValues.foreach { case (key, _) =>
          println(s"[Affinity Key] key = $key, owner = ${partitionService.getPartition(new AffinityKey(key)).getOwner}")
        }

        def getPartition(key: String): Partition =
          partitionService.getPartition(new AffinityKey(key))

        val partition1 = getPartition("key1-1")
        getPartition("key1-2").getOwner should be (partition1.getOwner)
        getPartition("key1-3").getOwner should be (partition1.getOwner)
        getPartition("key1-4").getOwner should be (partition1.getOwner)
        getPartition("key1-5").getOwner should be (partition1.getOwner)

        val partition2 = getPartition("key2-1")
        getPartition("key2-2").getOwner should be (partition2.getOwner)
        getPartition("key2-3").getOwner should be (partition2.getOwner)
        getPartition("key2-4").getOwner should be (partition2.getOwner)
        getPartition("key2-5").getOwner should be (partition2.getOwner)
      }
    }

こちらは、最後に配置状態のテストも行っています。関連データは、同じNodeに配置されるはずですので。

データを登録するところと

        keysValues.foreach { case (key, value) =>
          Array(map1, map2, map3).foreach(_.put(new AffinityKey(key), value))
        }

取得するところで、今回作成したキーを使用しています。

        def getPartition(key: String): Partition =
          partitionService.getPartition(new AffinityKey(key))

実行結果。

[Affinity Key] key = key1-1, owner = Member [192.168.129.129]:5701 this
[Affinity Key] key = key1-2, owner = Member [192.168.129.129]:5701 this
[Affinity Key] key = key1-3, owner = Member [192.168.129.129]:5701 this
[Affinity Key] key = key1-4, owner = Member [192.168.129.129]:5701 this
[Affinity Key] key = key1-5, owner = Member [192.168.129.129]:5701 this

[Affinity Key] key = key2-1, owner = Member [192.168.129.129]:5702
[Affinity Key] key = key2-2, owner = Member [192.168.129.129]:5702
[Affinity Key] key = key2-3, owner = Member [192.168.129.129]:5702
[Affinity Key] key = key2-4, owner = Member [192.168.129.129]:5702
[Affinity Key] key = key2-5, owner = Member [192.168.129.129]:5702

[Affinity Key] key = key3-1, owner = Member [192.168.129.129]:5701 this
[Affinity Key] key = key3-2, owner = Member [192.168.129.129]:5701 this
[Affinity Key] key = key3-3, owner = Member [192.168.129.129]:5701 this
[Affinity Key] key = key3-4, owner = Member [192.168.129.129]:5701 this
[Affinity Key] key = key3-5, owner = Member [192.168.129.129]:5701 this

[Affinity Key] key = key4-1, owner = Member [192.168.129.129]:5701 this
[Affinity Key] key = key4-2, owner = Member [192.168.129.129]:5701 this
[Affinity Key] key = key4-3, owner = Member [192.168.129.129]:5701 this
[Affinity Key] key = key4-4, owner = Member [192.168.129.129]:5701 this
[Affinity Key] key = key4-5, owner = Member [192.168.129.129]:5701 this

[Affinity Key] key = key5-1, owner = Member [192.168.129.129]:5704
[Affinity Key] key = key5-2, owner = Member [192.168.129.129]:5704
[Affinity Key] key = key5-3, owner = Member [192.168.129.129]:5704
[Affinity Key] key = key5-4, owner = Member [192.168.129.129]:5704
[Affinity Key] key = key5-5, owner = Member [192.168.129.129]:5704

とりあえず、キーでキレイにグルーピングされました。

なんか、偏ってる気がするけど…。まあいいか…。

分散処理を実行してみよう

最後に、Distributed Executor Serviceを使用した分散処理を。

先ほどから、

        val keysValues =
          for {
            i <- 1 to 5
            j <- 1 to 5
          } yield (s"key$i-$j", i * j)

        keysValues.foreach { case (key, value) =>
          map.put(new AffinityKey(key), value)
        }

みたいな感じでDistrubuted Mapにデータを放り込んでいましたが、ここで

key[M]-[N]

の「key[M]」単位でNodeごとに値の合計を算出してみます。

PartitionAwareを実装した、Callableの実装を用意。
src/main/scala/org/littlewings/hazelcast/affinity/DataAffinityTask.scala

package org.littlewings.hazelcast.affinity

import java.util.concurrent.Callable

import com.hazelcast.core.{Hazelcast, HazelcastInstance, HazelcastInstanceAware, PartitionAware}

@SerialVersionUID(1L)
class DataAffinityTask(keyPrefix: String, range: Integer) extends Callable[Integer]
                                                          with PartitionAware[String]
                                                          with HazelcastInstanceAware
                                                          with Serializable {
  private var hazelcast: HazelcastInstance = _

  override def setHazelcastInstance(hazelcast: HazelcastInstance): Unit =
    this.hazelcast = hazelcast

  @throws(classOf[Exception])
  override def call: Integer = {
    val map = hazelcast.getMap[AffinityKey, Integer]("map")

    (1 to range).foldLeft(0) { (acc, i) =>
      acc + map.get(new AffinityKey(s"${keyPrefix}-$i"))
    }
  }

  override def getPartitionKey: String =
    keyPrefix
}

追記)
このクラスは、コメントでHazelcastInstanceAwareインターフェースを実装することで、RunnableやCallableの中で、HazelcastInstanceが設定可能なことを教えていただきました。
hisanoさん、ありがとうございます。

keyPrefixというのは、外から

executor.submit(new DataAffinityTask("key1", 5)

こういう感じに設定されることを想定しています。

データの配置をコントロールしているキーと、同じものを使用しましょうということですね。

用意したテストコード。

    it("execute") {
      val instanceNumber = 4

      withHazelcast(instanceNumber) { hazelcast =>
        val partitionService = hazelcast.getPartitionService

        val map = hazelcast.getMap[AffinityKey, Integer]("map")

        val keysValues =
          for {
            i <- 1 to 5
            j <- 1 to 5
          } yield (s"key$i-$j", i * j)

        keysValues.foreach { case (key, value) =>
          map.put(new AffinityKey(key), value)
        }

        val executor = hazelcast.getExecutorService("default")

        val futures =
          Array(executor.submit(new DataAffinityTask("key1", 5)),
                executor.submit(new DataAffinityTask("key2", 5)),
                executor.submit(new DataAffinityTask("key3", 5)),
                executor.submit(new DataAffinityTask("key4", 5)),
                executor.submit(new DataAffinityTask("key5", 5)))

        futures.map(_.get) should contain theSameElementsInOrderAs Array(15, 30, 45, 60, 75)
      }
    }

Distributed Executor Serviceの#executeや#submitを呼び出す時、Memberやキーを指定して呼び出すものがあると思いますが、今回の様にRunnableやCallable自体がPartitionAwareを実装している場合は、Memberやキーを指定せずにHazelcastに実行Nodeをコントロールさせるのがよいのだと思います。

とりあえず、HazelcastのData Affinityを試してみました。単純に分散配置可能なMapやコレクションとして扱っている時は関係のない話かもですが、分散処理を実行する時は気にするポイントになるかもしれませんね。

ところで、書いていて思ったのですが、こういうところが気になりました。

- CallableやRunnableの中でHazelcastInstanceを取得する場合は、どうするのが適切なのか?

  • 話の前提としてMapが多いですが、ListとかSetでも同じ話なのか?
  • 基本的に、Distributed Mapに自分のコードでput/getしていることが前提

最初は、とりあえず実行時のスレッドからHazelcastInstance名が割り出せそうだったのでそうやっちゃいましたが、ホントはひとつのJavaVM中でひとつしかインスタンス作らないとかのパターンになるのかなぁ?

また、put/getなどのオペレーション時のキーに直接関係しちゃうので、自分が操作していない(ライブラリやフレームワークが内部的に使用している)場合には手出しができませんね。

今回作成したサンプルは、こちらにアップしています。

https://github.com/kazuhira-r/hazelcast-examples/tree/master/hazelcast-data-affinity