CLOVER🍀

That was when it all began.

HazelcastのContinuous Queryを試す

Hazelcast 3.0からの新機能、Continuous Queryをちょっと試してみます。

Continuous Query
http://www.hazelcast.com/docs/3.1/manual/single_html/#MapContinuousQuery

ドキュメントにはIMap#addEntryListenerのインターフェースしか記載されていないので、何の機能か最初はよくわからなかったのですが、Oracle Coherenceに同じ用語があったのでそのスライドを見てちょっと分かりました。

Oracle Coherence 概要 - KVS/分散キャッシュがもたらす可能性とは
http://www.slideshare.net/OracleMiddleJP/oracle-coherence-36-kvs

ばっくり言うと、そもそもエントリの追加や削除、更新などのイベントに対して発火させることができるEntryListenerというものがあるのですが、

Distributed Events
http://www.hazelcast.com/docs/3.1/manual/single_html/#Events

この発火条件をQueryで絞り込もうという機能です。対象となるデータ構造は、Distributed Mapですね。

使用するIMap#のメソッドも、Distributed EventsとContinuous Queryでかなり似通っています。

// Distributed Event(Listener)
public String addEntryListener(EntryListener<K,V> listener, boolean includeValue)
public String addEntryListener(EntryListener<K,V> listener, K key, boolean includeValue)

// Continuous Query
public String addEntryListener(EntryListener<K,V> listener, Predicate<K,V> predicate, boolean includeValue)
public String addEntryListener(EntryListener<K,V> listener, Predicate<K,V> predicate, K key, boolean includeValue)

両者の差は、引数にPredicateを受けとるかどうかしかないので。

で、EntryListenrインターフェースはこういう定義になっておりまして

public interface 
    public void entryAdded(EntryEvent<K,V> event);
    public void entryRemoved(EntryEvent<K,V> event);
    public void entryUpdated(EntryEvent<K,V> event);
    public void entryEvicted(EntryEvent<K,V> event);

エントリに対しての、各種イベント通知を受けることができます。EntryEventからは、

  • クラスタ内のどのメンバーでイベントが発生したか
  • イベントの種類
  • キー
  • 値(新/取得可能な場合は旧も)

などを取得可能です。値に対しては、EntryListenerを追加する時のIMap#addEntryListenerにincludeValueという引数がありますが、これをtrueにする必要があります。

…Continuous Queryと言いながら、なんかDistributed Eventsの説明も合わせてる感じですね。

ただ、IMap以外のデータ構造にはItemListenerという別のListnerが追加可能なので、そちらはちょっと違います(Continuous Queryの対象外です)。

というわけで、実はEntryListenerとわかったところで、簡単に試してみましょう。

準備

Leiningenでnew project。

$ lein new app hazelcast-continuous-query

project.clj

project.clj 
(defproject hazelcast-continuous-query "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "Eclipse Public License"
            :url "http://www.eclipse.org/legal/epl-v10.html"}
  :dependencies [[org.clojure/clojure "1.5.1"]
                 [com.hazelcast/hazelcast "3.1"]]
  :main ^:skip-aot hazelcast-continuous-query.core
  :target-path "target/%s"
  :aot :all
  :profiles {:uberjar {:aot :all}})

Predicateを使うということは、なにかしらQueryに対するデータオブジェクトを定義する必要があります。ここは、前にQueryを試した時のサンプルと同様、書籍ネタで。
src/hazelcast_continuous_query/book.clj

(ns hazelcast-continuous-query.book
  (:import (java.io Serializable)
           (java.util Map)))

(gen-class :name hazelcast-continuous-query.book.Book
           :implements [java.io.Serializable]
           :state state
           :init init
           :constructors {[java.util.Map] []}
           :methods [[getIsbn13 [] String]
                     [getName [] String]
                     [getPrice [] int]
                     [getPublishDate [] String]
                     [getCategory [] String]
                     [isOutOfPrint [] boolean]])

(defn -init [underlying]
  [[] underlying])

(defn -getIsbn13 [this]
  (:isbn13 (. this state)))

(defn -getName [this]
  (:name (. this state)))

(defn -getPrice [this]
  (:price (. this state)))

(defn -getPublishDate [this]
  (:publish-date (. this state)))

(defn -getCategory [this]
  (:category (. this state)))

(defn -isOutOfPrint [this]
  (:out-of-print (. this state)))

(defn -toString [this]
  (str (. this state)))

Listenerも、簡単に定義しておきます。
src/hazelcast_continuous_query/listener.clj

(ns hazelcast-continuous-query.listener
  (:import (com.hazelcast.core EntryEvent)
           (java.util Date)))

(gen-class :name hazelcast-continuous-query.listener.MyListener
           :implements [com.hazelcast.core.EntryListener])

(defn- log [^String msg]
  (println (str \[ (.. (Thread/currentThread) getName) \ \  msg)))

(defn -entryAdded [this ^EntryEvent event]
  (log (str "added event => " event)))

(defn -entryEvicted [this ^EntryEvent event]
  (log (str "evicted event => " event)))

(defn -entryRemoved [this ^EntryEvent event]
  (log (str "removed event => " event)))

(defn -entryUpdated [this ^EntryEvent event]
  (log (str "updated event => " event)))

イベントを受け取ったら、動作しているスレッド名と共にEntryEventをコンソールに出力するだけの、簡単なListenerです。

使ってみる

では、お馴染みと化してきたような、こんな感じのメインスクリプトを用意します。
src/hazelcast_continuous_query/core.clj

(ns hazelcast-continuous-query.core
  (:gen-class)
  (:import (com.hazelcast.config Config)
           (com.hazelcast.core Hazelcast HazelcastInstance IMap)
           (com.hazelcast.query SqlPredicate)
           (hazelcast-continuous-query.book Book)
           (hazelcast-continuous-query.listener MyListener)))

(defn with-hazelcast [func]
  (try
    (let [^Config config (Config.)
          ^HazelcastInstance hazelcast (Hazelcast/newHazelcastInstance config)]
      (func hazelcast)
      ;; HazelcastInstanceをシャットダウン
      (.. hazelcast getLifecycleService shutdown))
    ;; 全Hazelcastインスタンスをシャットダウン
    (finally (Hazelcast/shutdownAll))))

(defn -main
  [& args]
  (with-hazelcast
    (fn [hazelcast]
      (let [^IMap book-map (. hazelcast getMap "book-map")]
        ;; ここをちょこちょこと変える

        ;; 適当に書籍を登録
        (. book-map put "978-1782167303" (Book. {:isbn13 "978-1782167303"
                                                 :name "Getting Started with Hazelcast"
                                                 :price 4147
                                                 :publish-date "2013-08-27"
                                                 :category "IMDG"
                                                 :out-of-print false}))
        (. book-map put "978-1849518222" (Book. {:isbn13 "978-1849518222"
                                                 :name "Infinispan Data Grid Platform"
                                                 :price 3115
                                                 :publish-date "2012-06-30"
                                                 :category "IMDG"
                                                 :out-of-print false}))
        (. book-map put "978-4274069130" (Book. {:isbn13 "978-4274069130"
                                                 :name "プログラミングClojure 第2版"
                                                 :price 3570
                                                 :publish-date "2013-04-26"
                                                 :category "Clojure"
                                                 :out-of-print false}))
        (. book-map put "978-4774159911" (Book. {:isbn13 "978-4774159911"
                                                 :name "おいしいClojure入門"
                                                 :price 2919
                                                 :publish-date "2013-09-26"
                                                 :category "Clojure"
                                                 :out-of-print false}))
        (. book-map put "978-4774127804" (Book. {:isbn13 "978-4774127804"
                                                 :name "Apache Lucene 入門 〜Java・オープンソース・全文検索システムの構築"
                                                 :price 3360
                                                 :publish-date "2006-05-17"
                                                 :category "FullTextSearch"
                                                 :out-of-print true}))
        (. book-map put "978-4774141756" (Book. {:isbn13 "978-4774141756"
                                                 :name "Apache Solr入門 ―オープンソース全文検索エンジン"
                                                 :price 3780
                                                 :publish-date "2010-02-20"
                                                 :category "FullTextSearch"
                                                 :out-of-print false}))

        (Thread/sleep 3000)))))

最後にスリープが入っているのは、EntryListenerが別スレッドで動作しているようなので、少し待ってあげないと結果が出力されないことがあったからです…。

では、この部分を変えていってみましょう。

      (let [^IMap book-map (. hazelcast getMap "book-map")]
        ;; ここをちょこちょこと変える
Continuous Queryを付ける

先ほどのコメントしているところを、以下のように変更します。

      (let [^IMap book-map (. hazelcast getMap "book-map")]
        ;; Continuous Queryを使うために、EntryListenerを登録
        (. book-map addEntryListener (MyListener.) (SqlPredicate. "price > 3500") true)

SqlPredicateを使って、IMapに登録されるエントリのうち、価格が3,500円以上のものに対して、EntryListenerを起動します。

動かしてみましょう。

$ lein run

Listenerに通知される、コンソール出力されたものはこのようになりました。

[hz._hzInstance_1_dev.cached.thread-1  added event => EntryEvent {Address[192.168.129.129]:5701} key=978-4774141756, oldValue=null, value={:isbn13 "978-4774141756", :publish-date "2010-02-20", :name "Apache Solr入門 ―オープンソース全文検索エンジン", :out-of-print false, :price 3780, :category "FullTextSearch"}, event=ADDED, by Member [192.168.129.129]:5701 this[hz._hzInstance_1_dev.cached.thread-3  added event => EntryEvent {Address[192.168.129.129]:5701} key=978-1782167303, oldValue=null, value={:isbn13 "978-1782167303", :publish-date "2013-08-27", :name "Getting Started with Hazelcast", :out-of-print false, :price 4147, :category "IMDG"}, event=ADDED, by Member [192.168.129.129]:5701 this

[hz._hzInstance_1_dev.cached.thread-2  added event => EntryEvent {Address[192.168.129.129]:5701} key=978-4274069130, oldValue=null, value={:isbn13 "978-4274069130", :publish-date "2013-04-26", :name "プログラミングClojure 第2版", :out-of-print false, :price 3570, :category "Clojure"}, event=ADDED, by Member [192.168.129.129]:5701 this

もうちょっと変えてみましょう。絶版の本だけを対象にします。

      (let [^IMap book-map (. hazelcast getMap "book-map")]
        ;; Continuous Queryを使うために、EntryListenerを登録
        (. book-map addEntryListener (MyListener.) (SqlPredicate. "outOfPrint") true)

出力結果は、こうなりました。

[hz._hzInstance_1_dev.cached.thread-3  added event => EntryEvent {Address[192.168.129.129]:5701} key=978-4774127804, oldValue=null, value={:isbn13 "978-4774127804", :publish-date "2006-05-17", :name "Apache Lucene 入門 〜Java・オープンソース・全文検索システムの構築", :out-of-print true, :price 3360, :category "FullTextSearch"}, event=ADDED, by Member [192.168.129.129]:5701 this

試してはいませんが、SqlPredicateを使うのではなく、Criteria APIを使ったやり方でもOKのはずです。引数の型はPredicateなので。

Distributed Eventsとして動作させる

先ほどの説明で、Continuous QueryはEntryListenerのイベント通知条件を絞り込んでいるだけだという話でしたので、Predicateを外せば単にIMapで発生するイベントに対するListenerになります。

例えば、このように。

      (let [^IMap book-map (. hazelcast getMap "book-map")]
        ;; こちらは、普通のEntryListener
        (. book-map addEntryListener (MyListener.) true)

結果は、このように。

[hz._hzInstance_1_dev.cached.thread-3  added event => EntryEvent {Address[192.168.129.129]:5701} key=978-1782167303, oldValue=null, value={:isbn13 "978-1782167303", :publish-date "2013-08-27", :name "Getting Started with Hazelcast", :out-of-print false, :price 4147, :category "IMDG"}, event=ADDED, by Member [192.168.129.129]:5701 this
[hz._hzInstance_1_dev.cached.thread-4  added event => EntryEvent {Address[192.168.129.129]:5701} key=978-4774127804, oldValue=null, value={:isbn13 "978-4774127804", :publish-date "2006-05-17", :name "Apache Lucene 入門 〜Java・オープンソース・全文検索システムの構築", :out-of-print true, :price 3360, :category "FullTextSearch"}, event=ADDED, by Member [192.168.129.129]:5701 this
[hz._hzInstance_1_dev.cached.thread-2  added event => EntryEvent {Address[192.168.129.129]:5701} key=978-1849518222, oldValue=null, value={:isbn13 "978-1849518222", :publish-date "2012-06-30", :name "Infinispan Data Grid Platform", :out-of-print false, :price 3115, :category "IMDG"}, event=ADDED, by Member [192.168.129.129]:5701 this
[hz._hzInstance_1_dev.cached.thread-4  added event => EntryEvent {Address[192.168.129.129]:5701} key=978-4774141756, oldValue=null, value={:isbn13 "978-4774141756", :publish-date "2010-02-20", :name "Apache Solr入門 ―オープンソース全文検索エンジン", :out-of-print false, :price 3780, :category "FullTextSearch"}, event=ADDED, by Member [192.168.129.129]:5701 this
[hz._hzInstance_1_dev.cached.thread-2  added event => EntryEvent {Address[192.168.129.129]:5701} key=978-4774159911, oldValue=null, value={:isbn13 "978-4774159911", :publish-date "2013-09-26", :name "おいしいClojure入門", :out-of-print false, :price 2919, :category "Clojure"}, event=ADDED, by Member [192.168.129.129]:5701 this
[hz._hzInstance_1_dev.cached.thread-1  added event => EntryEvent {Address[192.168.129.129]:5701} key=978-4274069130, oldValue=null, value={:isbn13 "978-4274069130", :publish-date "2013-04-26", :name "プログラミングClojure 第2版", :out-of-print false, :price 3570, :category "Clojure"}, event=ADDED, by Member [192.168.129.129]:5701 this

すべてのエントリに対するイベントが出力されました。

イベント通知は非同期のようなので、必ずしもIMapに対する操作順にイベントが通知されるようではないみたいですね。putした順と、コンソール出力される順番はバラバラでしたし。

ところで、両方ともキーを指定してEntryListenerを登録するインターフェースがあるのですが、こちらはContinuous Queryの方はうまく動かせなかったので諦めました。通常のDistributed Eventsとして使う分には、特定のキーに反応してリスナーが起動したのですが…?