CLOVER🍀

That was when it all began.

HazelcastのEntry Processorを使う

Hazelcast 3.0からの新機能、Entry Processorを使ってみたいと思います。Oracle Coherenceにも同じ名前の機能があるみたいですけど、似たものでしょうか?(こっちは使ったことがありません)

Entry Processor
http://www.hazelcast.com/docs/3.1/manual/single_html/#MapEntryProcessor

いわゆる分散処理に属するものですが、Hazelcastには他にもDistributed Executor Serviceなるものがありますが、これは別の機会に。

それでは、簡単に概要を説明。

  • Entry Processorは、Hazelcastが提供するDistributed Mapを使った機能です
  • EntryProcessorインターフェースを実装したクラスを使って、Map中のエントリに対してアトミックな方法で処理を行うことができます
  • この時、明示的なロックは不要で、ロックはEntryProcessorを実行する時にHazelcastが行い、エントリに対する処理が終わり次第ロックを開放します

要は、Map中のエントリに対する分散処理です。エントリの指定は、単一のキーまたはすべてのエントリに対して行うことができます。あと、複雑なオブジェクトをエントリとして使用している場合は、シリアライズのコストを抑えるためにInMemoryFormatをOBJECTにすることが推奨らしいですね。

使ってみる

コードは、自分の趣味で、相変わらずClojureで書きます。EntryProcessorインターフェースを実装したクラスは、Serializableである必要があるので、lein-execでは厳しいだろうと思い、コンパイル形式にしました。

というわけで、プロジェクト作成。

$ lein new app hazelcast-entry-processor
cd hazelcast-entry-processor

project.cljは、このようにしました。
project.clj

(defproject hazelcast-entry-processor "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "Eclipse Public License"
            :url "http://www.eclipse.org/legal/epl-v10.html"}
  :dependencies [[org.clojure/clojure "1.5.1"]
                 [com.hazelcast/hazelcast "3.1"]]
  :main hazelcast-entry-processor.core
  :aot :all
  :profiles {:uberjar {:aot :all}})

依存関係に、Hazelcastを加えています。なんか、昨日Hazelcast 3.1がリリースされていたみたいですよ…。

あと、「lein run」で実行したいので、「:aot :all」を入れているところがポイントです。

それでは、EntryProcessorインターフェースを実装したクラスを。実は、Entry Processorを使うためには、以下の2つのインターフェースを実装したクラスが必要です。

  • com.hazelcast.map.EntryProcessor
  • com.hazelcast.map.EntryBackupProcessor(不要な場合もある)

両者は、別々のクラスを用意しても、同じクラスが両方のインターフェースを実装していてもかまいません。ドキュメントのサンプルが両方実装していたので、今回はそちらに合わせておきました。EntryBackupProcessorのインスタンスは、EntryProcessor#getBackupProcessorメソッドで返却します。

では、実装したクラスです。
src/hazelcast_entry_processor/processor.clj

(ns hazelcast-entry-processor.processor
  (import (com.hazelcast.map EntryBackupProcessor EntryProcessor)
          (java.util Map Map$Entry)))

(gen-class
 :name hazelcast-entry-processor.processor.DoublingProcessor
 :implements [com.hazelcast.map.EntryBackupProcessor
              com.hazelcast.map.EntryProcessor])

(defn -process [this ^Map$Entry entry]
  (println (format "My Key[%s] Value[%s]" (. entry getKey) (. entry getValue)))
  ;; Doubling
  (. entry setValue (* (. entry getValue) 2))
  (. entry getValue))

;; ドキュメントのサンプルと同じく、EntryBackupProcessorは自分自身
(defn -getBackupProcessor [this]
  nil)

(defn -processBackup [this ^Map$Entry entry]
  (println (format "My Backup Key[%s] Value[%s]" (. entry getKey) (. entry getValue)))
  ;; バックアップも、2倍
  (. entry setValue (* (. entry getValue) 2)))

メインとなる処理は、EntryProcessor#processで、Map.Entryのインスタンスが渡ってくるのでこちらに対して演算処理を行います。

(defn -process [this ^Map$Entry entry]
  (println (format "My Key[%s] Value[%s]" (. entry getKey) (. entry getValue)))
  ;; Doubling
  (. entry setValue (* (. entry getValue) 2))
  (. entry getValue))

ここでは、最初にMap.Entryそのものの2倍して、その値を返しています。そういうことをすると、もちろんDistributed Mapそのもののエントリが変更されます。

なお、EntryProcessor#processの戻り値はObjectになります。

あとは、EntryBackupProcessorを返却している部分と、processBackupを実装している箇所です。

;; ドキュメントのサンプルと同じく、EntryBackupProcessorは自分自身
(defn -getBackupProcessor [this]
  nil)

(defn -processBackup [this ^Map$Entry entry]
  (println (format "My Backup Key[%s] Value[%s]" (. entry getKey) (. entry getValue)))
  ;; バックアップも、2倍
  (. entry setValue (* (. entry getValue) 2)))

processBackupメソッドは、EntryProcessor#processメソッドでエントリを変更してしまう場合に実装します。単純な計算処理であれば、EntryBackupProcessor自体要らなくなるので、EntryProcessor#getBackupProcessorでnullを返してしまえばよさそうです。

ところで、このクラスを実装するのに1番てこずったのは、ClojureでMap.Entryを指定する方法

;; import
  (import (com.hazelcast.map EntryBackupProcessor EntryProcessor)
          (java.util Map Map$Entry)))

;; Type Hint
^Map$Entry entry

と、インターフェースが規定するメソッドを実装する時には、gen-classの:methodsに明示的に書かなくてもいいということですね。これを最初に知らずに書いてたら、

Exception in thread "main" java.lang.ClassFormatError: Duplicate method name&signature in class file hazelcast-entry-processor/processor/DoublingProcessor, compiling:(...)

と言われて、しばらくハマっていました…。

そして、EntryProcessorを利用する側のコードです。
src/hazelcast_entry_processor/core.clj

(ns hazelcast-entry-processor.core
  (:gen-class)
  (:import (com.hazelcast.config Config InMemoryFormat MapConfig)
           (com.hazelcast.core Hazelcast HazelcastInstance IMap)
           (java.util Date)
           (hazelcast-entry-processor.processor DoublingProcessor)))

(defn with-hazelcast [f]
  (try
    (let [^Config config (Config.)
          ^HazelcastInstance instance (Hazelcast/newHazelcastInstance config)]
      ;; シリアライズのコストを抑えるため、フォーマットをOBJECTにすることを推奨
      (. (. config getMapConfig "default") setInMemoryFormat (InMemoryFormat/OBJECT))
      (try
        (f instance)
        (finally (.. instance getLifecycleService shutdown))))
    (finally (Hazelcast/shutdownAll))))

;; ジョブのマスタ
(defn start-master [mode]
  (with-hazelcast
    (fn [instance]
      (let [^IMap single-map (. instance getMap "single-map")
            ^IMap all-keys-map (. instance getMap "all-keys-map")]
        ;; ===== 単一のキーに対して処理を行う =====
        (println "========== Single Key Process ==========")
        (. single-map put "key1" 1)
        (. single-map put "key2" 2)
        (. single-map put "key3" 3)
        (let [result (. single-map executeOnKey "key1" (DoublingProcessor.))]
          (println (format "single-process-result: key[%s] value[%s]" "key1" result)))

        (println "========================================")

        (println "============ All Key Process ===========")
        ;; ===== Distributed Mapの全キーに対して処理を行う =====
        ;; 指定がない場合は、Distributed Mapの中身を再登録する
        (when (not (= mode "no-refresh"))
          (doseq [i (range 1 11)]
            (. all-keys-map put (str "key" i) i)))

        ;; IMap#executeOnEntriesの引数に、EntryProcessorを与えることで分散処理
        (let [^Map result-map (. all-keys-map executeOnEntries (DoublingProcessor.))]

          ;; 処理結果を表示して確認する
          (doseq [result-entry result-map]
            (println (format "execute-result: key[%s] value[%s]"
                             (. result-entry getKey)
                             (. result-entry getValue)))))

        (println "============ In Distributed Map Entries ===================")

        ;; オリジナルのDistributed Mapの中身を確認する
        (doseq [original-entry all-keys-map]
          (println (format "original-entry: key[%s] value[%s]"
                           (. original-entry getKey)
                           (. original-entry getValue))))))))

;; ジョブ待ちのサーバ
(defn start-worker []
  (with-hazelcast
   (fn [instance]
     (println (format "[%s] Worker Started." (Date.)))
     (doseq [_ (take-while (fn [l] false)
                           (repeatedly read-line))]))))

(defn -main
  [& args]
  (case (count args)
    0 (start-worker)
    (case (first args)
      "worker" (start-worker)
      "master" (start-master (second args)))))

このプログラムでは、ジョブのマスタとワーカーの役割を導入しています。ワーカーは、単にHazelcastクラスタに参加して浮いててもらうだけ、マスタがジョブを実行して結果を取得します。

この手のプログラムのパターン、Infinispanで何度書いたことか。

ちなみに、ワーカーはEnterを1回打つとシャットダウンします。

で、シリアライズの方法を設定しているのはここですね。

      (. (. config getMapConfig "default") setInMemoryFormat (InMemoryFormat/OBJECT))

推奨通りOBJECTにしましたが、これにしなくても動作します。あとは、パフォーマンス見合いということで。

続いて、ジョブの実行を行うところ。単一のキーに対して処理を行うためのMapと、全キーに対して処理を行うためのMapを今回は別々に用意しました。もちろん、一緒でも構わないはずです。

      (let [^IMap single-map (. instance getMap "single-map")
            ^IMap all-keys-map (. instance getMap "all-keys-map")]

まずは単一のキーに対する処理から。データを登録して、

        (. single-map put "key1" 1)
        (. single-map put "key2" 2)
        (. single-map put "key3" 3)

IMap#executeOnKey(Key, EntroProcessor)で単一のキーに関連するエントリに対して、EntryProcessorを実行します。

        (let [result (. single-map executeOnKey "key1" (DoublingProcessor.))]
          (println (format "single-process-result: key[%s] value[%s]" "key1" result)))

戻り値はObjectです。

続いて、全エントリに対して処理を行う場合。まずはデータ登録。

        (when (not (= mode "no-refresh"))
          (doseq [i (range 1 11)]
            (. all-keys-map put (str "key" i) i)))

ちょっとした確認のため、条件が指定されていなかったらデータを登録しないようにしています。あとは、IMap#executeOnEntries(EntryProcessor)で全エントリに対して処理を行います。

        ;; IMap#executeOnEntriesの引数に、EntryProcessorを与えることで分散処理
        (let [^Map result-map (. all-keys-map executeOnEntries (DoublingProcessor.))]

戻り値はMapであり、キーは変わらないということですね。ちなみに、ドキュメントではexecuteOnAllKeysというメソッド名で紹介されていますが、正しくはexecuteOnEntriesです。

それにしても、EntryProcessorとかはJavadocに載っていないのですが、どうして?

その後は、戻り値のMapの全エントリと、元々のMapの中身を全部表示して終了です。

動かしてみる

では、動作させてみましょう。最初は、ジョブのマスタだけで動かしてみます。

$ lein run master

自分でprintlnしている場所のみ、抜粋して載せます。

最初は、単一のキーに対する処理。

My Key[key1] Value[1]
single-process-result: key[key1] value[2]
========================================

EntryProcessor#processにより、値が2倍されて返されていることがわかります。「My Key Value」とか言っているのは、EntryProcessor#process中でMap.Entryのキーと値を出力しているからです。

続いて、全エントリに対する処理。

============ All Key Process ===========
My Key[key6] Value[6]
My Key[key5] Value[5]
My Key[key2] Value[2]
My Key[key10] Value[10]
My Key[key4] Value[4]
My Key[key7] Value[7]
My Key[key3] Value[3]
My Key[key9] Value[9]
My Key[key1] Value[1]
My Key[key8] Value[8]

EntryProcessor側で、全キーを受け取っていることがわかります。実行結果は、こうなります。

execute-result: key[key4] value[8]
execute-result: key[key3] value[6]
execute-result: key[key6] value[12]
execute-result: key[key5] value[10]
execute-result: key[key2] value[4]
execute-result: key[key1] value[2]
execute-result: key[key10] value[20]
execute-result: key[key8] value[16]
execute-result: key[key7] value[14]
execute-result: key[key9] value[18]

各キーに対応する値が、2倍されています。今回は、元々のMapのエントリも変更したので、こちらも2倍になっています。

============ In Distributed Map Entries ===================
original-entry: key[key2] value[4]
original-entry: key[key7] value[14]
original-entry: key[key1] value[2]
original-entry: key[key3] value[6]
original-entry: key[key4] value[8]
original-entry: key[key5] value[10]
original-entry: key[key9] value[18]
original-entry: key[key8] value[16]
original-entry: key[key6] value[12]
original-entry: key[key10] value[20]

ところで、EntryBackupProcesor#processBackupでもprintlnしているはずなのですが、まったく結果に現れていませんね。

こちらも合わせて、今度は分散環境で実行してみましょう。

最初に、ワーカー用のNodeを2つ起動します。

# Worker-1
$ lein run worker

# Worker-2
$ lein run worker

それぞれのコンソールで、こんな表示が行われます。

# Worker-1
Members [2] {
	Member [192.168.129.128]:5701 this
	Member [192.168.129.128]:5702
}

# Worker-2
Members [2] {
	Member [192.168.129.128]:5701
	Member [192.168.129.128]:5702 this
}

ここで、マスタを実行します。

# Job-Master
$ lein run master

それでは、コンソールに現れた結果を載せます。単一のキーに対する方は、省略しますね。

まずはワーカーNode。

# Worker-1
My Backup Key[key5] Value[5]
My Key[key10] Value[10]
My Key[key3] Value[3]
My Key[key6] Value[6]
My Key[key2] Value[2]
My Backup Key[key7] Value[7]
My Key[key9] Value[9]

# Worker-2
My Backup Key[key1] Value[1]
My Backup Key[key4] Value[4]
My Backup Key[key1] Value[1]
My Backup Key[key8] Value[8]
My Key[key7] Value[7]
My Backup Key[key10] Value[10]
My Backup Key[key3] Value[3]

そして、ジョブのマスタ。

# Job-Master
============ All Key Process ===========
My Key[key5] Value[5]
My Key[key4] Value[4]
My Key[key1] Value[1]
My Key[key8] Value[8]
My Backup Key[key6] Value[6]
My Backup Key[key2] Value[2]
My Backup Key[key9] Value[9]
execute-result: key[key4] value[8]
execute-result: key[key3] value[6]
execute-result: key[key6] value[12]
execute-result: key[key5] value[10]
execute-result: key[key2] value[4]
execute-result: key[key1] value[2]
execute-result: key[key10] value[20]
execute-result: key[key8] value[16]
execute-result: key[key7] value[14]
execute-result: key[key9] value[18]
============ In Distributed Map Entries ===================
original-entry: key[key2] value[4]
original-entry: key[key7] value[14]
original-entry: key[key1] value[2]
original-entry: key[key3] value[6]
original-entry: key[key4] value[8]
original-entry: key[key5] value[10]
original-entry: key[key9] value[18]
original-entry: key[key8] value[16]
original-entry: key[key6] value[12]
original-entry: key[key10] value[20]

今回のキーの分布を見ると、

  • ジョブのマスタ → key1, 4, 5, 8、バックアップ割り当はkey2, 6, 9
  • ワーカー1 → key2, 3, 6, 9, 10、バックアップ割り当てはkey7
  • ワーカー2 → key7、バックアップ割り当てはkey1, 3, 4, 8, 10

となっています。

EntryProcesorは、データ・ローカリティを意識した形で実行されます。

で、今回はEntryBackupProcessorの出力結果が出てきました。そして、ジョブのマスタはジョブが終了するとシャットダウンします。

ということは、Hazelcastはバックアップからエントリを復元しようとします。

この結果どうなるかということで、今度はDistributed Mapの中身を再登録せずに実行してみましょう。ワーカー用のNodeは浮いてもらったままです。

$ lein run master no-refresh

Mapへデータの再登録は行いません。

全キーに対するエントリの処理結果のみ、そしてジョブのマスタのみ結果を載せます。

# Job-Master
============ All Key Process ===========
My Key[key6] Value[12]
My Backup Key[key4] Value[8]
My Backup Key[key7] Value[14]
My Backup Key[key5] Value[10]
My Backup Key[key10] Value[20]
My Backup Key[key3] Value[6]
My Backup Key[key2] Value[4]
execute-result: key[key4] value[16]
execute-result: key[key3] value[12]
execute-result: key[key6] value[24]
execute-result: key[key5] value[20]
execute-result: key[key2] value[8]
execute-result: key[key1] value[4]
execute-result: key[key10] value[40]
execute-result: key[key8] value[32]
execute-result: key[key7] value[28]
execute-result: key[key9] value[36]
============ In Distributed Map Entries ===================
original-entry: key[key7] value[28]
original-entry: key[key10] value[40]
original-entry: key[key4] value[16]
original-entry: key[key1] value[4]
original-entry: key[key5] value[20]
original-entry: key[key9] value[36]
original-entry: key[key6] value[24]
original-entry: key[key3] value[12]
original-entry: key[key2] value[8]
original-entry: key[key8] value[32]

無事、元通り復元されました。

とはいえ、エントリそのものの値を変えるのはどうなんだろう?と思ってみたり。割とこういう使い方をするものなのでしょうか?

それでも、分散処理の一環なので、なかなか楽しい機能です。