CLOVER🍀

That was when it all began.

HazelcastのDistributed Executor Serviceを使う

前回のEntry Processorに続いて、今度はDistributed Executor Serviceを使ってみます。こちらは、Hazelcast 1系の時からあった機能みたいですね。

Distributed Executor Service
http://www.hazelcast.com/docs/3.1/manual/single_html/#ExecutorService

使ってみた感じ、InfinispanのDistributed Execution Frameworkとそんなに変わらなかったので、使い方にはそれほど迷うことはなかったです。

どちらかといえば、Clojureでハマっていましたが…。

ばっくり言ってしまうと、クラスタ内の各NodeにCallableインターフェースを実装したクラスのインスタンスを配り、分散実行してしまうという機能です。処理のフロントエンドとなるのは、java.util.concurrent.ExecutorServiceインターフェースを拡張したもので、ExecutorService#submit系のメソッドを呼び出すことで処理を依頼し、最後に結果がFutureで戻ってきます。

メソッドのパターンも、単一のメンバーや複数メンバークラスタ全体のメンバーを使う方法などがあるので、結果の受け取り方も依頼する方法でFutureになるのか、Futureを含んだMapになったりします。

トピックスとしては、他にも

  • タスクのキャンセル
  • ExecutionCallbackを使った結果の取得

があるのですが、今回は割愛します。

準備

今回も、Callableの実装をシリアライズして転送するので、プロジェクトを準備します。

$ lein new app hazelcast-distexec

project.clj

(defproject hazelcast-distexec "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "Eclipse Public License"
            :url "http://www.eclipse.org/legal/epl-v10.html"}
  :dependencies [[org.clojure/clojure "1.5.1"]
                 [com.hazelcast/hazelcast "3.1"]]
  :main hazelcast-distexec.core
  :aot :all
  :profiles {:uberjar {:aot :all}})

aot難しいです!ちょっとしたクラスの依存関係で、簡単にハマります…。

では、分散実行されるCallableの実装。Clojureの関数って、RunnableでありCallableでもあるのですが、シリアライズされる関係上きちんとクラス定義をします。
src/hazelcast_distexec/task.clj

(ns hazelcast-distexec.task
  (:import (com.hazelcast.core Hazelcast HazelcastInstance IMap)
           (java.io Serializable)
           (java.util.concurrent Callable)))

(gen-class :name hazelcast-distexec.task.SumTask
           :implements [java.util.concurrent.Callable java.io.Serializable]
           :state mapName
           :constructors {[String] []}
           :init init)

(defn -init [name]
  [[] name])

(defn -call ^Integer [this]
  (let [^HazelcastInstance hazelcast (first (Hazelcast/getAllHazelcastInstances))
        ^IMap dist-map (. hazelcast getMap (.. this mapName))
        ^Set keys (.. dist-map localKeySet)]
    (println (format "My Local Keys => %s" keys))
    (reduce #(+ (. dist-map get %2) %1) 0 keys)))

Hazelcastとは関係ないですけど、:stateの使い方が最初わかりませんでした…。

今回は、Distributed Mapを使用するものとして、含まれるデータの型を以下のように考えます。

Map<Integer, Integer>

で、ここで各メンバーにローカルに割り当てられた値を合算して返却するというタスクにしています。この部分ですね。

;; メンバーのローカルキーを取得
^Set keys (.. dist-map localKeySet)

;; 合算
(reduce #(+ (. dist-map get %2) %1) 0 keys)))

IMap#localKeySetというのがあるのは、便利だなーと思います。

それでは、このタスクを利用する側。

今回も、ジョブのマスターとワーカーの役割を導入します。
src/hazelcast_distexec/core.clj

(ns hazelcast-distexec.core
  (:gen-class)
  (:import (com.hazelcast.config Config)
           (com.hazelcast.core Hazelcast HazelcastInstance IExecutorService IMap Member)
           (java.util Date Map)
           (java.util.concurrent Future)
           (hazelcast-distexec.task SumTask)))

(defn- with-hazelcast [f]
  (try
    ;; HazelcastInstanceの作成
    (let [^Config config (Config.)
          ^HazelcastInstance hazelcast (Hazelcast/newHazelcastInstance config)]
      (f hazelcast)
      ;; HazelcastInstanceをシャットダウン
      (.. hazelcast getLifecycleService shutdown))

    ;; 全HazelcastInstanceをシャットダウン
    (finally (Hazelcast/shutdownAll))))

(defn- start-master []
  (with-hazelcast
    (fn [hazelcast]
      ;; ExecutorServiceの取得
      (let [^String map-name "dist-map"
            ^IMap dist-map (. hazelcast getMap map-name)
            ^IExecutorService executorService (. hazelcast getExecutorService "default")]
        ;; データの初期登録
        (doseq [i (range 1 11)]
          (. dist-map put i i))

        ;; submitで、クラスタ内のどこかのメンバーで実行される
        (let [^Callable task (SumTask. map-name)
              ^Future f (. executorService submit task)]
          (println "=============== submit[START] ===============")
          (println (format "value = %s" (.. f get)))
          (println "=============== submit[END] ==============="))

        (println)

        ;; submitToMemberで、指定したMemberで実行される
        (let [^Callable task (SumTask. map-name)
              ^Member member (first (.. hazelcast getCluster getMembers))
              ^Future f (. executorService submitToMember task member)]
          (println "=============== submitToMember[START] ===============")
          (println (format "%s, value = %s" member (.. f get)))
          (println "=============== submitToMember[END] ==============="))

        (println)

        ;; submitToKeyOwnerで、指定したキーの所有者Memberで実行される
        (let [^Callable task (SumTask. map-name)
              key 5
              ^Future f (. executorService submitToKeyOwner task key)]
          (println "=============== submitToKeyOwner[START] ===============")
          (println (format "spec key = %s, value = %s" key (.. f get)))
          (println "=============== submitToKeyOwner[END] ==============="))

        (println)

        ;; submitToMembersで、Setで指定したMemberで実行される
        (let [^Callable task (SumTask. map-name)
              ^Map future-map (. executorService submitToMembers task (.. hazelcast getCluster getMembers))]
          (println "=============== submitToMembers[START] ===============")
          (doseq [m future-map]
            (println (format "[%s]: value = %s" (. m getKey) (.. m getValue get))))
          (println "=============== submitToMembers[END] ==============="))

        (println)

        ;; submitToAllMembersで、クラスタ内の全Member
        (let [^Callable task (SumTask. map-name)
              ^Map future-map (. executorService submitToAllMembers task)]
          (println "=============== submitToAllMembers[START] ===============")
          (doseq [m future-map]
            (println (format "[%s]: value = %s" (. m getKey) (.. m getValue get))))
          (println "=============== submitToAllMembers[END] ==============="))))))

(defn- start-worker []
  (with-hazelcast
    (fn [hazelcast]
      (println (format "[%s] Start Worker..." (Date.)))
      (read-line))))

(defn -main
  [& args]
  (case (count args)
    0 (start-worker)
    (case (first args)
      "worker" (start-worker)
      "master" (start-master)
      (start-worker))))

ワーカーはやっぱり浮いててもらうだけで、Enterを入力すると即終了します。

ジョブのマスターについては、最初に初期データを登録して、ExecutorServiceに各種処理の依頼をします。

ちなみに、正確にはjava.util.concurrent.ExecutorServiceの型ではなく、com.hazelcast.core.IExecutorServiceの型を使用します。

      ;; ExecutorServiceの取得
      (let [^String map-name "dist-map"
            ^IMap dist-map (. hazelcast getMap map-name)
            ^IExecutorService executorService (. hazelcast getExecutorService "default")]

この時、Distributed Mapも一緒に作ってしまいます。

で、初期データの登録。

        ;; データの初期登録
        (doseq [i (range 1 11)]
          (. dist-map put i i))

タスクを実行する際には、Callableのインタンスを渡すわけですが、この時使用するsubmitメソッドでタスクを実行する範囲が変わります。

submitを使うと、戻り値はFutureになります。

        ;; submitで、クラスタ内のどこかのメンバーで実行される
        (let [^Callable task (SumTask. map-name)
              ^Future f (. executorService submit task)]
          (println "=============== submit[START] ===============")
          (println (format "value = %s" (.. f get)))
          (println "=============== submit[END] ==============="))

        (println)

submitToMemberを使った場合も、戻り値はFutureになります。メンバーは、HazelcastInstance#getCluster#getMembersの最初に選出されるものにしています。

        ;; submitToMemberで、指定したMemberで実行される
        (let [^Callable task (SumTask. map-name)
              ^Member member (first (.. hazelcast getCluster getMembers))
              ^Future f (. executorService submitToMember task member)]
          (println "=============== submitToMember[START] ===============")
          (println (format "%s, value = %s" member (.. f get)))
          (println "=============== submitToMember[END] ==============="))

submitToKeyOwnerの場合は、戻り値はFutureになります。キーは「5」とし、このキーの所有者となるメンバーで実行されるようになります。

        ;; submitToKeyOwnerで、指定したキーの所有者Memberで実行される
        (let [^Callable task (SumTask. map-name)
              key 5
              ^Future f (. executorService submitToKeyOwner task key)]
          (println "=============== submitToKeyOwner[START] ===============")
          (println (format "spec key = %s, value = %s" key (.. f get)))
          (println "=============== submitToKeyOwner[END] ==============="))

submitToMembersの場合は、戻り値はMapとなります。今回は、クラスタ内の全メンバーを対象にしました。まあ、これだとsubmitToAllMembersと同じですが。

        ;; submitToMembersで、Setで指定したMemberで実行される
        (let [^Callable task (SumTask. map-name)
              ^Map future-map (. executorService submitToMembers task (.. hazelcast getCluster getMembers))]
          (println "=============== submitToMembers[START] ===============")
          (doseq [m future-map]
            (println (format "[%s]: value = %s" (. m getKey) (.. m getValue get))))
          (println "=============== submitToMembers[END] ==============="))

submitToAllMembersの場合は、戻り値はMapとなります。

        ;; submitToAllMembersで、クラスタ内の全Member
        (let [^Callable task (SumTask. map-name)
              ^Map future-map (. executorService submitToAllMembers task)]
          (println "=============== submitToAllMembers[START] ===============")
          (doseq [m future-map]
            (println (format "[%s]: value = %s" (. m getKey) (.. m getValue get))))
          (println "=============== submitToAllMembers[END] ==============="))))))

なので、今回の例ではCallableを割り当てられた先で、自分の担当するキーに応じた値を合算してくるという感じですね。

よって、クラスタ内に複数のメンバーがいた場合には、単一のメンバーに処理を依頼するケースについては、完全な合算はできないことになりますが…。

実行する

それでは、作成したコードを実行してみます。

まずはStandaloneで。

$ lein run master

今回は、メンバーはひとりしかいないので

Members [1] {
	Member [192.168.129.128]:5701 this
}

結果は

=============== submit[START] ===============
My Local Keys => [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
value = 55
=============== submit[END] ===============

=============== submit[START] ===============
My Local Keys => [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Member [192.168.129.128]:5701 this, value = 55
=============== submit[END] ===============

=============== submitToKeyOwner[START] ===============
My Local Keys => [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
spec key = 5, value = 55
=============== submitToKeyOwner[END] ===============

=============== submitToMembers[START] ===============
My Local Keys => [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[Member [192.168.129.128]:5701 this]: value = 55
=============== submitToMembers[END] ===============

=============== submitToAllMembers[START] ===============
My Local Keys => [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[Member [192.168.129.128]:5701 this]: value = 55
=============== submitToAllMembers[END] ===============

となります。全部、1+2+...+10で、55になりましたね。

メンバーがひとりなので、全データが単一のメンバーの所有物になりますので。

では、今度はクラスタにワーカーを2つ入れてみましょう。

# Worker-1
$ lein run worker

# Worker-2
$ lein run worker

それぞれ、起動が完了するとこういう見え方になります。

# Worker-1
Members [2] {
	Member [192.168.129.128]:5701 this
	Member [192.168.129.128]:5702
}

# Worker-2
Members [2] {
	Member [192.168.129.128]:5701
	Member [192.168.129.128]:5702 this
}

では、ここからジョブのマスターを実行します。

$ lein run master

まずは、クラスタに参加して

Members [3] {
	Member [192.168.129.128]:5701
	Member [192.168.129.128]:5702
	Member [192.168.129.128]:5703 this
}

結果の出力は、こうなります。

My Local Keys => [8, 10, 2, 3, 7]
=============== submit[START] ===============
value = 30
=============== submit[END] ===============

=============== submitToMember[START] ===============
Member [192.168.129.128]:5701, value = 16
=============== submitToMember[END] ===============

=============== submitToKeyOwner[START] ===============
spec key = 5, value = 16
=============== submitToKeyOwner[END] ===============

My Local Keys => [8, 10, 2, 3, 7]
=============== submitToMembers[START] ===============
[Member [192.168.129.128]:5701]: value = 16
[Member [192.168.129.128]:5702]: value = 9
[Member [192.168.129.128]:5703 this]: value = 30
=============== submitToMembers[END] ===============

=============== submitToAllMembers[START] ===============
My Local Keys => [8, 10, 2, 3, 7]
[Member [192.168.129.128]:5701]: value = 16
[Member [192.168.129.128]:5702]: value = 9
[Member [192.168.129.128]:5703 this]: value = 30
=============== submitToAllMembers[END] ===============

だいぶ結果が変わりましたね。複数のメンバーを指定している方は、結果が分かれているので、別途合算しないと55になりません。どのメンバーの計算結果は、出力されているメンバーの情報を見ればわかります。

その他については、マスタはsubmitメソッドのみ処理を行ったみたいですね。submitToMemberはクラスタの最初のメンバーということでワーカーにいったみたいですし、submitToKeyOwnerではキー5の所有者はワーカーになったようです。

ちなみに、ジョブのマスターの所有しているキーは、今回は以下の様になりました。

My Local Keys => [8, 10, 2, 3, 7]

ワーカー側の出力を見てみましょう。

# Worker-1
My Local Keys => [4, 5, 1, 6]
My Local Keys => [4, 5, 1, 6]
My Local Keys => [4, 5, 1, 6]
My Local Keys => [4, 5, 1, 6]

# Worker-2
My Local Keys => [9]
My Local Keys => [9]

数から考えると、submitToMemberとsubmitToKeyOwnerで仕事をしたのは、ひとつめのワーカーですね。

とまあ、こんな感じの分散処理フレームワークでした。Entry Processorとはちょっと処理の粒度が違いますね。適宜使い分けって感じでしょう。

ところで、submitToKeyOwnerでどうやってキーの所有者を選出しているかですが、PartionServiceを使えばよいみたいですよ。PartitionServiceは、HazelcastInstance#getPartitionServiceで取得できそうです。

とはいえ、今回はMapにしましたけど、ここでの「キー」って何者でしょうね?