CLOVER🍀

That was when it all began.

Hazelcastが提供するデータ構造ひとめぐり

Hazelcast入門ということで、ドキュメントを見ながらHazelcastが提供する各種データ構造をさらっと使ってみたいと思います。

Distributed Data Structures
http://www.hazelcast.com/docs/3.0/manual/single_html/#DDS

Hazelcastが提供するDistributedなデータ構造には、以下のものがあります。

  • Distributed Map
  • Distributed Queue
  • Distributed MultiMap
  • Distributed Topic
  • Distributed Set
  • Distributed List

Topicはデータ構造と言えるのか…あと、Distributed LockとDistributed Events、Distributed Semaphore(ドキュメント未記載、Javadocにはあり)は対象から外してあります。

けっこう、様々なデータ構造がありますね。Mapだけじゃないんだなーと。

いずれのデータ構造にも、以下のようのな特徴があります。

  • データは、クラスタ内の全Nodeに対して均等に分配され、クラスタ中の各Nodeは(1/クラスタのNode数 × データの総量)+バックアップを保持します
  • もしメンバーがダウンした場合は、バックアップから同じデータをレプリケートし、動的に再分配します。結果、データは失われません
  • 新しいNodeがクラスタに参加した時は、新しいNodeはクラスタからいくらかのエントリの所有権の獲得とロードを行います。新しいNodeは(1/n × データの総量)+バックアップを持つことになります
  • 単一のクラスタのマスタや、SPOFとなるものがありません。クラスタ中の各Nodeの関係は、平等、対等となっています。そして、外部のサーバやマスタなどに依存するといったことはありません

訳がぎこちない…。

要は、データはバックアップを取って持ち、Nodeがダウンすればそこから復元するし、Nodeが追加されれば再分配しますよ、と。そして、単一障害点とかマスタの概念を持たないってことですね。

あと、データがシリアライズされて保存されるのはわかっていましたが、各種のデータ構造からデータを取得などする際には防御的コピーが入っている様で、データ構造から取得したオブジェクトの参照を変更しても、再度取得した時にはその変更は反映されていませんでした。

なので、基本的にこれらのデータには

  • 追加
  • 置き換え
  • 削除

といった更新オペレーションと、データの取得が中心になるということですね。

ちょっと長くなりましたが、それでは各データ構造を簡単に使っていってみましょう。

Distributed Map

Hazelcastによるjava.util.Mapの実装で、java.util.concurrent.ConcurrentMapの実装でもあります。Hazelcast内のインターフェースとしては、com.hazelcast.core.IMapの実装です。

取得方法は、HazelcastInstande#getMapです。

それでは、サンプルコード。最初なので、少し丁寧に見ていきます。基本的に、2つ以上のプログラムを同時に動かしてクラスタを構成して確認します。Node同士の待ち合わせとかは、超適当な実装になっていますが、そちらはご了承のほどを…。

基本的に

  • 先に起動して他のメンバーを待つプログラム
  • 後から起動して、データを参照したり登録して、先にシャットダウンするプログラム

の2本構成です。

では、まず1本目。
hazelcast_map_1.clj

(require '[leiningen.exec :as exec])

(exec/deps '[[com.hazelcast/hazelcast "3.0.2"]])

(ns hazelcast-map
  (:import (com.hazelcast.config Config)
           (com.hazelcast.core Hazelcast HazelcastInstance)
           (java.util Map)))

(try
  ;; HazelcastInstanceを作成する
  (let [^Config config (Config.)
        ^HazelcastInstance instance (Hazelcast/newHazelcastInstance config)]
    ;; Mapを取得
    (let [^Map customer-map (. instance getMap "customers")]
      (. customer-map put "customer-1" {:name "Taro" :age 22})
      (. customer-map put "customer-2" {:name "Hanako" :age 20})
      (. customer-map put "customer-3" {:name "Ken" :age 18})

      ;; 別のサーバがデータを登録するのを待つ
      (doseq [_ (take-while
                 false?
                 (repeatedly (fn []
                               (do (println "Waiting...")
                                   (Thread/sleep 3000)
                                   (contains? customer-map "customer-4")))))])

      ;; 別のサーバがシャットダウンするのを待つ
      (Thread/sleep 5000)

      ;; 現在のMapの中身を表示
      (doseq [entry customer-map]
        (println (str "receive entry, " entry))))

    ;; HazelcastInstanceをシャットダウンする
    (.. instance getLifecycleService shutdown))

  ;; 全Hazelcastのインスタンスをシャットダウンする
  (finally (Hazelcast/shutdownAll)))

今回は、普通にMapとして扱いました。人を、3人ほど叩き込んでいます。

2本目。
hazelcast_map_2.clj

(require '[leiningen.exec :as exec])

(exec/deps '[[com.hazelcast/hazelcast "3.0.2"]])

(ns hazelcast-map
  (:import (com.hazelcast.config Config)
           (com.hazelcast.core Hazelcast HazelcastInstance)
           (java.util Map)))

(try
  ;; HazelcastInstanceを作成する
  (let [^Config config (Config.)
        ^HazelcastInstance instance (Hazelcast/newHazelcastInstance config)]
    ;; Mapを取得する
    (let [^Map customer-map (. instance getMap "customers")]
      ;; 取得したMapに登録してある情報を出力する
      (doseq [entry customer-map]
        (println (str "receive entry, " entry)))

      ;; Mapにデータを登録する
      (. customer-map put "customer-4" {:name "Katsuo" :age 11})
      (. customer-map put "customer-5" {:name "Wakame" :age 7})
      (. customer-map put "customer-1" {:name "Tarao" :age 3}))

    ;; HazelcastInstanceをシャットダウンする
    (.. instance getLifecycleService shutdown))

  ;; 全Hazelcastのインスタンスをシャットダウンする
  (finally (Hazelcast/shutdownAll)))

基本的に、put/getしているだけです。先に起動したサーバが登録したデータを参照して、それから3人追加するというものになっています。

では、Mapについてはちょっと動きまで紹介しておきましょう。

1本目、起動。

$ lein exec hazelcast_map_1.clj
10 11, 2013 11:42:07 午後 com.hazelcast.instance.DefaultAddressPicker
情報: Prefer IPv4 stack is true.

〜省略〜

10 11, 2013 11:42:12 午後 com.hazelcast.cluster.MulticastJoiner
情報: [192.168.129.128]:5701 [dev] 


Members [1] {
	Member [192.168.129.128]:5701 this
}

10 11, 2013 11:42:12 午後 com.hazelcast.core.LifecycleService
情報: [192.168.129.128]:5701 [dev] Address[192.168.129.128]:5701 is STARTED
10 11, 2013 11:42:12 午後 com.hazelcast.partition.PartitionService
情報: [192.168.129.128]:5701 [dev] Initializing cluster partition table first arrangement...
Waiting...
Waiting...

起動したら、他のサーバ待ちになります。

ここで、2本目を起動。

$ lein exec hazelcast_map_2.clj 
10 11, 2013 11:43:19 午後 com.hazelcast.instance.DefaultAddressPicker
情報: Prefer IPv4 stack is true.
10 11, 2013 11:43:19 午後 com.hazelcast.instance.DefaultAddressPicker
情報: Picked Address[192.168.129.128]:5702, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5702], bind any local is true

〜省略〜

情報: [192.168.129.128]:5702 [dev] Creating MulticastJoiner
10 11, 2013 11:43:19 午後 com.hazelcast.core.LifecycleService
情報: [192.168.129.128]:5702 [dev] Address[192.168.129.128]:5702 is STARTING
10 11, 2013 11:43:19 午後 com.hazelcast.nio.SocketConnector
情報: [192.168.129.128]:5702 [dev] Connecting to /192.168.129.128:5701, timeout: 0, bind-any: true
10 11, 2013 11:43:19 午後 com.hazelcast.nio.TcpIpConnectionManager
情報: [192.168.129.128]:5702 [dev] 34179 accepted socket connection from /192.168.129.128:5701
10 11, 2013 11:43:24 午後 com.hazelcast.cluster.ClusterService
情報: [192.168.129.128]:5702 [dev] 

Members [2] {
	Member [192.168.129.128]:5701
	Member [192.168.129.128]:5702 this
}

10 11, 2013 11:43:26 午後 com.hazelcast.core.LifecycleService
情報: [192.168.129.128]:5702 [dev] Address[192.168.129.128]:5702 is STARTED
receive entry, customer-2={:age 20, :name "Hanako"}
receive entry, customer-1={:age 22, :name "Taro"}
receive entry, customer-3={:age 18, :name "Ken"}
10 11, 2013 11:43:26 午後 com.hazelcast.core.LifecycleService
情報: [192.168.129.128]:5702 [dev] Address[192.168.129.128]:5702 is SHUTTING_DOWN
10 11, 2013 11:43:26 午後 com.hazelcast.nio.TcpIpConnection
情報: [192.168.129.128]:5702 [dev] Connection [Address[192.168.129.128]:5701] lost. Reason: java.io.EOFException[Remote socket closed!]
10 11, 2013 11:43:26 午後 com.hazelcast.initializer
情報: [192.168.129.128]:5702 [dev] Destroying node initializer.
10 11, 2013 11:43:26 午後 com.hazelcast.instance.Node
情報: [192.168.129.128]:5702 [dev] Hazelcast Shutdown is completed in 94 ms.
10 11, 2013 11:43:26 午後 com.hazelcast.core.LifecycleService
情報: [192.168.129.128]:5702 [dev] Address[192.168.129.128]:5702 is SHUTDOWN

起動したら、クラスタに参加してデータを表示、そしてシャットダウンします。

一方、1本目のプログラムではこういう動きをしています。

10 11, 2013 11:43:24 午後 com.hazelcast.cluster.ClusterService
情報: [192.168.129.128]:5701 [dev] 

Members [2] {
	Member [192.168.129.128]:5701 this
	Member [192.168.129.128]:5702
}

10 11, 2013 11:43:25 午後 com.hazelcast.partition.PartitionService
情報: [192.168.129.128]:5701 [dev] Re-partitioning cluster data... Migration queue size: 135
10 11, 2013 11:43:26 午後 com.hazelcast.partition.PartitionService
情報: [192.168.129.128]:5701 [dev] All migration tasks has been completed, queues are empty.
10 11, 2013 11:43:26 午後 com.hazelcast.nio.TcpIpConnection
情報: [192.168.129.128]:5701 [dev] Connection [Address[192.168.129.128]:5702] lost. Reason: Socket explicitly closed
10 11, 2013 11:43:26 午後 com.hazelcast.cluster.ClusterService
情報: [192.168.129.128]:5701 [dev] Removing Member [192.168.129.128]:5702
10 11, 2013 11:43:26 午後 com.hazelcast.cluster.ClusterService
情報: [192.168.129.128]:5701 [dev] 

Members [1] {
	Member [192.168.129.128]:5701 this
}

10 11, 2013 11:43:29 午後 com.hazelcast.partition.PartitionService
情報: [192.168.129.128]:5701 [dev] Partition balance is ok, no need to re-partition cluster data... 
receive entry, customer-2={:age 20, :name "Hanako"}
receive entry, customer-3={:age 18, :name "Ken"}
receive entry, customer-5={:age 7, :name "Wakame"}
receive entry, customer-4={:age 11, :name "Katsuo"}
receive entry, customer-1={:age 3, :name "Tarao"}
10 11, 2013 11:43:32 午後 com.hazelcast.core.LifecycleService
情報: [192.168.129.128]:5701 [dev] Address[192.168.129.128]:5701 is SHUTTING_DOWN
10 11, 2013 11:43:32 午後 com.hazelcast.initializer
情報: [192.168.129.128]:5701 [dev] Destroying node initializer.
10 11, 2013 11:43:32 午後 com.hazelcast.instance.Node
情報: [192.168.129.128]:5701 [dev] Hazelcast Shutdown is completed in 26 ms.
10 11, 2013 11:43:32 午後 com.hazelcast.core.LifecycleService
情報: [192.168.129.128]:5701 [dev] Address[192.168.129.128]:5701 is SHUTDOWN

クラスタにメンバーが増えたことを検知し、その後メンバーが離脱したことを検知します。最後にデータを表示してシャットダウンです。

ポイントは、2つ目のNodeで1つ目のNodeが登録したデータが見え、2つ目のNodeが離脱した後でも追加したデータが、1つ目のNodeから失われず見えている、ということですね。

他のデータ構造でも、基本的にこの特性を備えています。

では、あとは少し簡単に紹介していきましょう。

Distributed Queue

続いて、Queueです。java.util.Queue、java.util.concurrent.BlockingQueueの実装です。com.hazelcast.core.IQueueの実装でもあります。FIFOですよ。

では、サンプルを。

1本目。
hazelcast_queue_1.clj

(require '[leiningen.exec :as exec])

(exec/deps '[[com.hazelcast/hazelcast "3.0.2"]])

(ns hazelcast-queue
  (:import (com.hazelcast.config Config)
           (com.hazelcast.core Hazelcast HazelcastInstance)
           (java.util Queue)))

(try
  ;; HazelcastInstanceを作成する
  (let [^Config config (Config.)
        ^HazelcastInstance instance (Hazelcast/newHazelcastInstance config)]
    ;; Queueを取得する
    (let [^Queue customer-queue (. instance getQueue "customers")]
      ;; Queueにデータを登録する
      (. customer-queue offer {:name "Taro" :age 22})
      (. customer-queue offer {:name "Hanako" :age 20})
      (. customer-queue offer {:name "Ken" :age 18})

      ;; 別のサーバがQueueのデータを取得するのを待つ
      (doseq [_ (take-while
                 true?
                 (repeatedly (fn []
                               (do (println "Waiting...")
                                   (Thread/sleep 3000)
                                   (empty? customer-queue)))))])

      ;; 別のサーバがシャットダウンするのを待つ
      (Thread/sleep 10000)

      ;; Queueの中身を全て取り出して表示
      (doseq [_ (range 0 (count customer-queue))]
        (println (str "queue entry => " (. customer-queue poll)))))

    ;; HazelcastInstanceをシャットダウンする
    (.. instance getLifecycleService shutdown))
  ;; 全Hazelcastのインスタンスをシャットダウンする
  (finally (Hazelcast/shutdownAll)))

2本目。
hazelcast_queue_2.clj

(require '[leiningen.exec :as exec])

(exec/deps '[[com.hazelcast/hazelcast "3.0.2"]])

(ns hazelcast-queue
  (:import (com.hazelcast.config Config)
           (com.hazelcast.core Hazelcast HazelcastInstance)
           (java.util Queue)))

(try
  ;; HazelcastInstanceを作成する
  (let [^Config config (Config.)
        ^HazelcastInstance instance (Hazelcast/newHazelcastInstance config)]
    ;; Queueを取得する
    (let [^Queue customer-queue (. instance getQueue "customers")]
      ;; Queueの中身を全て取り出して表示
      (doseq [_ (range 0 (count customer-queue))]
        (println (str "queue entry => " (. customer-queue poll))))

      (println (count customer-queue))

      ;; Queueにデータを登録する
      (. customer-queue offer {:name "Katsuo" :age 11})
      (. customer-queue offer {:name "Wakame" :age 7})
      (. customer-queue offer {:name "Tarao" :age 3}))

    ;; HazelcastInstanceをシャットダウンする
    (.. instance getLifecycleService shutdown))
  ;; 全Hazelcastのインスタンスをシャットダウンする
  (finally (Hazelcast/shutdownAll)))

このプログラムでは、ひとつ目のNodeが登録したデータをふたつ目のNodeが全て取り出し、別のデータを登録してシャットダウンします。残ったNodeは、ふたつ目のNodeが登録したデータを取り出してシャットダウンします。

Distributed MultiMap

ひとつのキーに対して、複数の値を紐付けられるMapです。これはJDKに対応するインターフェースがないので、com.hazelcast.core.MultiMapインターフェースの実装となっています。

キーに対応する値はCollectionとなり、MultiMap#sizeは全データの数を返します。例えば、ひとつのキーにデータを3つ登録した場合は、MultiMap#sizeは3です。この他、MultiMap#entrySetでは、複数登録されたデータは各エントリとして取得されるため、同一のキーに対するイテレーションが複数回行われるかも、ということになります。

では、使ったコードを。

1本目。
hazelcast_multimap_1.clj

(require '[leiningen.exec :as exec])

(exec/deps '[[com.hazelcast/hazelcast "3.0.2"]])

(ns hazelcast-multimap
  (:import (com.hazelcast.config Config)
           (com.hazelcast.core Hazelcast HazelcastInstance MultiMap)))

(try
  ;; HazelcastInstanceを作成する
  (let [^Config config (Config.)
        ^HazelcastInstance instance (Hazelcast/newHazelcastInstance config)]
    ;; MultiMapを取得する
    (let [^MultiMap customer-multimap (. instance getMultiMap "customers")]
      ;; MultiMapにデータを登録する
      (. customer-multimap put "NEW HORIZEN" {:name "Ken"})
      (. customer-multimap put "NEW HORIZEN" {:name "Kumi"})
      (. customer-multimap put "NEW HORIZEN" {:name "Mike"})

      ;; この場合、要素数は3となるらしい
      (assert (= (. customer-multimap size) 3))

      (doseq [entry (. customer-multimap entrySet)]
        (println (str (. entry getKey) " => " (. entry getValue))))

      ;; 別のサーバがMultiMapにデータを登録するのを待つ
      (doseq [_ (take-while
                 false?
                 (repeatedly (fn []
                               (do (println "Waiting...")
                                   (Thread/sleep 3000)
                                   (> (. customer-multimap size) 3)))))])

      ;; 別のサーバがシャットダウンするのを待つ
      (Thread/sleep 5000)

      (assert (= (. customer-multimap size) 6))

      ;; MultiMapに登録されたデータを全て出力する
      (doseq [entry (. customer-multimap entrySet)]
        (println (str (. entry getKey) " => " (. entry getValue)))))

    ;; HazelcastInstanceをシャットダウンする
    (.. instance getLifecycleService shutdown))

  ;; 全Hazelcastのインスタンスをシャットダウンする
  (finally (Hazelcast/shutdownAll)))

2本目。
hazelcast_multimap_2.clj

(require '[leiningen.exec :as exec])

(exec/deps '[[com.hazelcast/hazelcast "3.0.2"]])

(ns hazelcast-multimap
  (:import (com.hazelcast.config Config)
           (com.hazelcast.core Hazelcast HazelcastInstance MultiMap)))

(try
  ;; HazelcastInstanceを作成する
  (let [^Config config (Config.)
        ^HazelcastInstance instance (Hazelcast/newHazelcastInstance config)]
    ;; MultiMapを取得する
    (let [^MultiMap customer-multimap (. instance getMultiMap "customers")]
      ;; 特定のキーに紐付く値は、Collectionとして取得できる
      (doseq [col (. customer-multimap get "NEW HORIZEN")]
        (doseq [v col] (println (str "NEW HORIZEN => " v))))

      ;; MultiMapにデータを登録する
      (. customer-multimap put "ISONOKE" {:name "Katsuo"})
      (. customer-multimap put "ISONOKE" {:name "Wakame"})
      (. customer-multimap put "ISONOKE" {:name "Tarao"}))

    ;; HazelcastInstanceをシャットダウンする
    (.. instance getLifecycleService shutdown))

  ;; 全Hazelcastのインスタンスをシャットダウンする
  (finally (Hazelcast/shutdownAll)))

やっていることは、MapやQueueの例とそう変わりませんが、ここでの出力結果はふたつ目のNodeでは、

NEW HORIZEN => [:name "Kumi"]
NEW HORIZEN => [:name "Ken"]
NEW HORIZEN => [:name "Mike"]

となり、ひとつ目のNodeではクラスタ参加前は

NEW HORIZEN => {:name "Mike"}
NEW HORIZEN => {:name "Kumi"}
NEW HORIZEN => {:name "Ken"}

と出力され、その後

ISONOKE => {:name "Tarao"}
NEW HORIZEN => {:name "Mike"}
NEW HORIZEN => {:name "Kumi"}
NEW HORIZEN => {:name "Ken"}
ISONOKE => {:name "Wakame"}
ISONOKE => {:name "Katsuo"}

となります。entrySetでイテレーションした場合は、こういう結果になるということですね。

Distributed Topic

なんと、Publish/Subscribeモデルが可能なTopicを、Hazelcastは提供しています。ちょっと毛色が違う気がしますが、まあいっかぁと。

Publishする側は、com.hazelcast.core.ITopicにメッセージを登録します。型は何でもいいですが、ITopicな型定義となっているので、その点に注意を。Subscribe側は、com.hazelcast.core.MessageListenerインターフェースを実装したクラスを、ITopicsに登録します。

Publish側がメッセージを送信する前に、ITopicにSubscribe側がリスナを登録していれば、onMessageメソッドでメッセージを購読することができます。

それでは、プログラムを。このプログラムでは、Subscribe側が2つのNodeあることを前提にしています。

1本目。
hazelcast_topic_1.clj

(require '[leiningen.exec :as exec])

(exec/deps '[[com.hazelcast/hazelcast "3.0.2"]])

(ns hazelcast-topic
  (:import (com.hazelcast.config Config)
           (com.hazelcast.core Hazelcast HazelcastInstance Message MessageListener ITopic)))
(try
  ;; HazelcastInstanceを作成する
  (let [^Config config (Config.)
        ^HazelcastInstance instance (Hazelcast/newHazelcastInstance config)]
    ;; 他のサーバを待ち合わせ
    (let [xs (. instance getList "counter")]
      (doseq [_ (take-while
                 false?
                 (repeatedly (fn []
                               (do (println "Waiting...")
                                   (Thread/sleep 3000)
                                   (= (count xs) 2)))))]))

    ;; Topicを取得する
    (let [^ITopic topic (. instance getTopic "my-topic")]
      ;; メッセージを送信する
      (. topic publish "Hello")
      (. topic publish "Hoge")
      (. topic publish "Foo")
      (. topic publish "Bar"))

    ;; 他のサーバがメッセージを受信するまで、少し待つ
    (Thread/sleep 15000)

    ;; HazelcastInstanceをシャットダウンする
    (.. instance getLifecycleService shutdown))

  ;; 全Hazelcastのインスタンスをシャットダウンする
  (finally (Hazelcast/shutdownAll)))

メッセージはITopic#publishで行い、他のNodeがメッセージを受信するのをちょっと待っています。

2本目。
hazelcast_topic_2.clj

(require '[leiningen.exec :as exec])

(exec/deps '[[com.hazelcast/hazelcast "3.0.2"]])

(ns hazelcast-topic
  (:import (com.hazelcast.config Config)
           (com.hazelcast.core Hazelcast HazelcastInstance Message MessageListener ITopic)))

(try
  ;; HazelcastInstanceを作成する
  (let [^Config config (Config.)
        ^HazelcastInstance instance (Hazelcast/newHazelcastInstance config)]
    ;; Topicを取得する
    (let [^ITopic topic (. instance getTopic "my-topic")]
      ;; TopicにMessageListenerを登録する
      (. topic addMessageListener (proxy [MessageListener] []
                                      (onMessage [^Message message]
                                                 (println (format "Received Message[%s] => %s"
                                                                  message
                                                                  (. message getMessageObject)))))))

    (. (. instance getList "counter") add "subscriber")

    ;; メッセージを受信しきるまで、擬似的に待つ
    (Thread/sleep 5000)

    ;; HazelcastInstanceをシャットダウンする
    (.. instance getLifecycleService shutdown))

  ;; 全Hazelcastのインスタンスをシャットダウンする
  (finally (Hazelcast/shutdownAll)))

こちらは、リスナを登録してメッセージを受信したらシャットダウンです。proxy関数で、Javaのインターフェースを実装したクラスを作成しています。

Distributed Set

java.util.Setの実装です。Concurrent系のインターフェースは実装していません。com.hazelcast.core.ISetインターフェースの実装でもあります。

一見普通のSetなのですが、このSetはオブジェクトの比較にequalsではなく、シリアライズしてバイナリ化したオブジェクトのバージョンを比較するそうです。

では、サンプル。

1本目。
hazelcast_set_1.clj

(require '[leiningen.exec :as exec])

(exec/deps '[[com.hazelcast/hazelcast "3.0.2"]])

(ns hazelcast-set
  (:import (com.hazelcast.config Config)
           (com.hazelcast.core Hazelcast HazelcastInstance)
           (java.util Set)))

(try
  ;; HazelcastInstanceを取得する
  (let [^Config config (Config.)
        ^HazelcastInstance instance (Hazelcast/newHazelcastInstance config)]
    ;; Setを取得する
    (let [^Set customer-set (. instance getSet "customers")]
      ;; Setにデータを登録する
      (. customer-set add {:name "Taro" :age 22})
      (. customer-set add {:name "Hanako" :age 20})
      (. customer-set add {:name "Ken" :age 18})

      ;; 別のサーバがSetにデータを追加するのを待つ
      (doseq [_ (take-while
                 false?
                 (repeatedly (fn [] (do (println "Waiting...")
                                        (Thread/sleep 3000)
                                        (= (count customer-set) 6)))))])

      ;; 別のサーバシャットダウンするのを待つ
      (Thread/sleep 5000)

      ;; Setの中身を全て表示
      (doseq [e customer-set] (println (format "Entry => %s" e))))

    ;; HazelcastInstanceをシャットダウンする
    (.. instance getLifecycleService shutdown))

  ;; 全Hazelcastのインスタンスをシャットダウンする
  (finally (Hazelcast/shutdownAll)))

2本目。
hazelcast_set_2.clj

(require '[leiningen.exec :as exec])

(exec/deps '[[com.hazelcast/hazelcast "3.0.2"]])

(ns hazelcast-set
  (:import (com.hazelcast.config Config)
           (com.hazelcast.core Hazelcast HazelcastInstance)
           (java.util Set)))

(try
  ;; HazelcastInstanceを取得する
  (let [^Config config (Config.)
        ^HazelcastInstance instance (Hazelcast/newHazelcastInstance config)]
    ;; Setを取得する
    (let [^Set customer-set (. instance getSet "customers")]
      ;; Setの中身を全て表示
      (doseq [e customer-set] (println (format "Entry => %s" e)))

      ;; Setにデータを登録する
      (. customer-set add {:name "Katuo" :age 11})
      (. customer-set add {:name "Wakame" :age 7})
      (. customer-set add {:name "Tarao" :age 3}))

    ;; HazelcastInstanceをシャットダウンする
    (.. instance getLifecycleService shutdown))

  ;; 全Hazelcastのインスタンスをシャットダウンする
  (finally (Hazelcast/shutdownAll)))

Distributed List

java.util.Listの実装です。Setと同様、Concurrent系のインターフェースは実装していません。com.hazelcast.core.IListインターフェースの実装でもあります。

ちょっと提供されているのが意外なデータ構造でした。

1本目。
hazelcast_list_1.clj

(require '[leiningen.exec :as exec])

(exec/deps '[[com.hazelcast/hazelcast "3.0.2"]])

(ns hazelcast-list
  (:import (com.hazelcast.config Config)
           (com.hazelcast.core Hazelcast HazelcastInstance)
           (java.text SimpleDateFormat)
           (java.util Date List)))

(try
  ;; HazelcastInstanceを取得する
  (let [^Config config (Config.)
        ^HazelcastInstance instance (Hazelcast/newHazelcastInstance config)]
    ;; Listを取得する
    (let [^List datetimes (. instance getList "datetimes")]
      ;; Listにデータを登録する
      (. datetimes add (Date.))
      (. datetimes add (Date.))
      (. datetimes add (Date.))

      ;; 別のサーバがListにデータを追加するのを待つ
      (doseq [_ (take-while
                 false?
                 (repeatedly (fn [] (do (println "Waiting...")
                                        (Thread/sleep 3000)
                                        (= (count datetimes) 6)))))])

      ;; 別のサーバシャットダウンするのを待つ
      (Thread/sleep 5000)

      ;; Listの中身を全て表示
      (doseq [d datetimes]
        (println (format "Entry => %s"
                         (. (SimpleDateFormat. "yyyy/MM/dd HH:mm:ss.SSS") format d)))))

    ;; HazelcastInstanceをシャットダウンする
    (.. instance getLifecycleService shutdown))

  ;; 全Hazelcastのインスタンスをシャットダウンする
  (finally (Hazelcast/shutdownAll)))

2本目。

hazelcast_list_2.clj 
(require '[leiningen.exec :as exec])

(exec/deps '[[com.hazelcast/hazelcast "3.0.2"]])

(ns hazelcast-list
  (:import (com.hazelcast.config Config)
           (com.hazelcast.core Hazelcast HazelcastInstance)
           (java.text SimpleDateFormat)
           (java.util Date List)))

(try
  ;; HazelcastInstanceを取得する
  (let [^Config config (Config.)
        ^HazelcastInstance instance (Hazelcast/newHazelcastInstance config)]
    ;; Listを取得する
    (let [^List datetimes (. instance getList "datetimes")]
      ;; Listの中身を全て表示
      ;; Listの中身を全て表示
      (doseq [d datetimes]
        (println (format "Entry => %s"
                         (. (SimpleDateFormat. "yyyy/MM/dd HH:mm:ss.SSS") format d))))

      (Thread/sleep 3000)

      ;; Listにデータを登録する
      (. datetimes add (Date.))
      (. datetimes add (Date.))
      (. datetimes add (Date.))

      ;; 他のサーバが登録したデータを書き換える
      (. (. datetimes get 0) setTime (. (Date.) getTime))

      (println "===========")

      ;; 一応、全部表示してみる
      (doseq [d datetimes]
        (println (format "Entry => %s"
                         (. (SimpleDateFormat. "yyyy/MM/dd HH:mm:ss.SSS") format d)))))

    ;; HazelcastInstanceをシャットダウンする
    (.. instance getLifecycleService shutdown))

  ;; 全Hazelcastのインスタンスをシャットダウンする
  (finally (Hazelcast/shutdownAll)))

今回は、あえてDateといったミュータブルなデータを使用しました。2本目のプログラムで途中で参照しているオブジェクトのフィールドを書き換えていますが、もう1度Listから取得しても、値が書き換わっていません。

こちらは、ふたつ目のNodeの出力結果です。

Entry => 2013/10/12 00:21:49.020
Entry => 2013/10/12 00:21:49.096
Entry => 2013/10/12 00:21:49.097
===========
Entry => 2013/10/12 00:21:49.020
Entry => 2013/10/12 00:21:49.096
Entry => 2013/10/12 00:21:49.097
Entry => 2013/10/12 00:22:00.705
Entry => 2013/10/12 00:22:00.710
Entry => 2013/10/12 00:22:00.712

「===========」の直前にインデックス0の要素の値を変更しているのですが、再取得した場合は最初と同じデータが取得できます。

防御的コピーを行っているってことですね。

以上、Hazelcastが提供するデータ構造ひとめぐりでした。