CLOVER🍀

That was when it all began.

Apache Geodeでイベント処理

Apache Geodeには、Regionへのエントリの追加、削除などのイベントに対して紐付けられる、イベントハンドリングの仕組みがあります。

http://geode.docs.pivotal.io/docs/developing/events/chapter_overview.html

概要としては、こちらを見るのがよいでしょう。

http://geode.docs.pivotal.io/docs/developing/events/how_events_work.html

Apache Geode内の分散システム内で、Cacheに関するイベントを受け取ることができます。Peer-to-Peerであっても、Client/Serverであっても利用でき、またクラスタ内の他のメンバーで発生したイベントも受け取ることができるようです。

特徴は、上記ページによるとこんな感じ。

  • イベントベース
  • 合成可能な非同期イベント通知
  • 低レンテンシな同期イベント通知
  • 冗長化された可用性の高いメッセージングキュー
  • イベント発生順かつ、1度だけの配信
  • 分散イベント通知
  • サブスクリプションの永続化

いろいろありますね…。

また、イベントには以下の2種類があるようです。

  • アプリケーションがCacheを操作することにより発生するイベントで、データの変更について詳細に受け取ることが可能
  • 管理系のAPIの利用により発生するイベント

いずれのイベントも、単一のメンバーの操作により発生します。これらのイベントはどちらか片方を扱うことができ、(ひとつのメンバーでは)両方を一緒に扱うことはできないようです。Cacheに関するイベントと管理系のイベントを別々に順序管理しているため、予期しない結果を引き起こす可能性があるから、らしいです。

イベントのライフサイクルとしては、次の順番になります。

  1. データのputやCacheのcloseといったオペレーションを開始します
  2. オペレーションを行うことで、イベントのトリガーとなったメソッドの詳細についてのオブジェクト、オペレーションを行ったメンバーやRegionに関する情報といったオブジェクトが生成されます
  3. イベントハンドラが呼び出され、イベントオブジェクトが渡されます。イベントごとに、異なるイベントハンドラが必要であり、対応するイベントハンドラが存在しない場合は特に何も起こりません
  4. イベントハンドラがイベントを受信すると、対応するイベントのコールバックメソッドを呼び出します。イベントハンドラの呼び出しタイミングがオペレーションの前なのか後なのかは、イベントの種類に依存し、またタイミングもイベントハンドラに依存します
  5. オペレーションが分散系のものだった場合、他のメンバーでも同じように処理することができるイベントを生成します

要するに、関心のある操作に対してイベントハンドラを登録すると、Apache Geodeが適切にイベントを判定してイベントハンドラを呼び出してくれますってことですね。

ドキュメント上の説明はまだまだ続きますが、とりあえずこれくらいにして実際に使ってみましょう。

今回は、Peer-to-Peerでの利用とします。また、分散環境では使用せず、単一のメンバーで実行してみます。

準備

まずは、Maven依存関係から。

        <dependency>
            <groupId>org.apache.geode</groupId>
            <artifactId>gemfire-core</artifactId>
            <version>1.0.0-incubating.M1</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.3.0</version>
            <scope>test</scope>
        </dependency>

利用するApache Geodeバージョンは、1.0.0-incubating.M1とします。JUnitとAssertJは、テストコード用です。

イベントハンドラの実装

それでは、イベントハンドラを実装します。

http://geode.docs.pivotal.io/docs/developing/events/implementing_cache_event_handlers.html

どのようなイベントに関連するイベントハンドラを作成したいかで、実装方法が変わります。

同期なのか非同期なのか、Region操作に関するものなのか、メンバーに関するものなのか、トランザクションに関するものなのか。

イベントハンドラは特定のインターフェースを実装して実現しますが、CacheやRegionに関するインターフェースの頂点にいるのはCacheCallbackインターフェースです。利用したいイベントハンドラに応じて、CacheCallbackインターフェースの派生インターフェースを実装します。主なイベントハンドラと関連するイベントに関する一覧は、こちら。

http://geode.docs.pivotal.io/docs/developing/events/list_of_event_handlers_and_events.html

CacheおよびRegionに関する操作であれば主にCacheListener、非同期操作であればAsyncEventListenerを実装します。

CacheWriterやCacheLoaderもこのイベントハンドリングについてのカテゴライズとされていて、やっぱりCacheCallbackのサブインターフェースとなっています。

Read Through、Write Through、Write Behindはこの仕組み上で実装することになるわけですね。Write Behindは、AsyncEventListenerとして実装することになるようですが。

http://geode.docs.pivotal.io/docs/developing/events/implementing_write_behind_event_handler.html

CacheListenerを実装する

まずはCacheListenerを実装して、Regionに対する追加、更新、削除に対するイベントを扱ってみたいと思います。

実際にはCacheListenerインターフェースを直接実装するのではなく、CacheListenerAdapterクラスを継承して、必要なイベント操作のみ実装します。
src/main/java/org/littlewings/geode/event/MyCacheListener.java

package org.littlewings.geode.event;

import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;

public class MyCacheListener extends CacheListenerAdapter<String, String> {
    @Override
    public void afterCreate(EntryEvent<String, String> event) {
        KeyValueStore<String, String> store = KeyValueStore.getInstance("create-event-store");

        String key = event.getKey();
        String oldValue = event.getOldValue();
        String newValue = event.getNewValue();

        store.put(key, oldValue + ":" + newValue);
    }

    @Override
    public void afterUpdate(EntryEvent<String, String> event) {
        KeyValueStore<String, String> store = KeyValueStore.getInstance("update-event-store");

        String key = event.getKey();
        String oldValue = event.getOldValue();
        String newValue = event.getNewValue();

        store.put(key, oldValue + ":" + newValue);
    }

    @Override
    public void afterDestroy(EntryEvent<String, String> event) {
        KeyValueStore<String, String> store = KeyValueStore.getInstance("destroy-event-store");

        String key = event.getKey();
        String oldValue = event.getOldValue();
        String newValue = event.getNewValue();

        store.put(key, oldValue + ":" + newValue);
    }
}

各メソッドの引数として、EntryEventが渡ってくるので、そこでキーや変更前後の値が取得できます。値については両方のものがいつも取れるとは限らず、例えば新規エントリ作成時であればNewValueが取れるだけになります。

なんか妙なクラスが間に挟まっていますが、これはテスト時に呼び出し内容を確認するためのクラスで、パラメーターなどを保存しています。実装はこんな感じです。
src/main/java/org/littlewings/geode/event/KeyValueStore.java

package org.littlewings.geode.event;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;

public class KeyValueStore<K, V> {
    private static final Map<String, KeyValueStore<?, ?>> INSTANCES = new ConcurrentHashMap<>();
    private Map<K, V> store = new ConcurrentHashMap<>();


    protected KeyValueStore() {
    }

    public static <K, V> KeyValueStore getInstance(String name) {
        return (KeyValueStore<K, V>) INSTANCES.computeIfAbsent(name, key -> new KeyValueStore<K, V>());
    }

    public static void clear() {
        INSTANCES.clear();
    }

    public void put(K key, V value) {
        store.put(key, value);
    }

    public V compute(K key, BiFunction<? super K,? super V,? extends V> remappingFunction) {
        return store.compute(key, remappingFunction);
    }

    public V get(K key) {
        return store.get(key);
    }

    public int size() {
        return store.size();
    }
}

それでは、これらを使って動作確認してみましょう。

テストコードの雛形

src/test/java/org/littlewings/geode/event/EventHandlingTest.java

package org.littlewings.geode.event;

import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionShortcut;
import org.junit.Before;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class EventHandlingTest {
    @Before
    public void setUp() {
        KeyValueStore.clear();
    }

    // ここに、テストを書く!
}

Listenerで呼び出し内容を保存する内容を、毎回クリアするようにしてるだけですね。

動作確認

では、作成したCacheListenerの動作確認をしてみましょう。

作成したテストコードは、こちら。

    @Test
    public void cacheListener() {
        try (Cache cache = new CacheFactory().create()) {
            Region<String, String> region =
                    cache
                            .<String, String>createRegionFactory(RegionShortcut.PARTITION)
                            .addCacheListener(new MyCacheListener())
                            .create("sampleRegion");

            // create
            region.put("key1", "value1");

            assertThat(KeyValueStore.getInstance("create-event-store").get("key1"))
                    .isEqualTo("null:value1");
            assertThat(KeyValueStore.getInstance("update-event-store").size())
                    .isEqualTo(0);
            assertThat(KeyValueStore.getInstance("destroy-event-store").size())
                    .isEqualTo(0);


            // update
            region.put("key1", "value1-1");

            assertThat(KeyValueStore.getInstance("create-event-store").get("key1"))
                    .isEqualTo("null:value1");
            assertThat(KeyValueStore.getInstance("update-event-store").get("key1"))
                    .isEqualTo("value1:value1-1");
            assertThat(KeyValueStore.getInstance("destroy-event-store").size())
                    .isEqualTo(0);


            // destroy
            region.remove("key1");

            assertThat(KeyValueStore.getInstance("create-event-store").get("key1"))
                    .isEqualTo("null:value1");
            assertThat(KeyValueStore.getInstance("update-event-store").get("key1"))
                    .isEqualTo("value1:value1-1");
            assertThat(KeyValueStore.getInstance("destroy-event-store").get("key1"))
                    .isEqualTo("value1-1:null");
        }
    }

Region作成時にCacheListenerをaddすることで、CacheListenerを登録することができます。

            Region<String, String> region =
                    cache
                            .<String, String>createRegionFactory(RegionShortcut.PARTITION)
                            .addCacheListener(new MyCacheListener())
                            .create("sampleRegion");

あとのコードはReginに対してput/removeしているだけですが、イベントごとにCacheListenerが呼び出されていることがわかります。

Cache XMLでCacheListenerを定義する

CacheListenerを、Cache XMLで定義してみます。

Cache XMLにCacheListenerを登録するのですが、この場合はCacheListenerがDeclarableインターフェースを実装しておく必要があります。今回は、簡単のため先ほど作成したCacheListenerを継承し、そこにDeclarableインターフェースを実装する形にしました。
src/main/java/org/littlewings/geode/event/MyDeclaredCacheListener.java

package org.littlewings.geode.event;

import java.util.Properties;

import com.gemstone.gemfire.cache.Declarable;

public class MyDeclaredCacheListener extends MyCacheListener implements Declarable {
    @Override
    public void init(Properties props) {
        // no-op
    }
}

initメソッドでは、Cache XMLで定義したプロパティを受け取ることができます。が、今回は端折ります…。

Cache XMLは、このように定義。
src/test/resources/cache-predefined-listener.xml

<?xml version="1.0" encoding="UTF-8"?>
<cache
        xmlns="http://schema.pivotal.io/gemfire/cache"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://schema.pivotal.io/gemfire/cache http://schema.pivotal.io/gemfire/cache/cache-8.1.xsd"
        version="8.1">
    <region name="sampleRegion" refid="PARTITION">
        <region-attributes>
            <cache-listener>
                <class-name>org.littlewings.geode.event.MyDeclaredCacheListener</class-name>
            </cache-listener>
            <!-- multiple cache listener
            <cache-listener>
                <class-name>XYZ</class-name>
            </cache-listener>
            -->
        </region-attributes>
    </region>
</cache>

コメントアウトしていますが、cache-listenerタグを並べることで、複数のCacheListenerを登録することができます。

動作確認用のテストコードは、こちら。

    @Test
    public void declaredCacheListener() {
        try (Cache cache = new CacheFactory().set("cache-xml-file", "cache-predefined-listener.xml").create()) {
            Region<String, String> region = cache.<String, String>getRegion("sampleRegion");

            // create
            region.put("key1", "value1");

            assertThat(KeyValueStore.getInstance("create-event-store").get("key1"))
                    .isEqualTo("null:value1");
            assertThat(KeyValueStore.getInstance("update-event-store").size())
                    .isEqualTo(0);
            assertThat(KeyValueStore.getInstance("destroy-event-store").size())
                    .isEqualTo(0);


            // update
            region.put("key1", "value1-1");

            assertThat(KeyValueStore.getInstance("create-event-store").get("key1"))
                    .isEqualTo("null:value1");
            assertThat(KeyValueStore.getInstance("update-event-store").get("key1"))
                    .isEqualTo("value1:value1-1");
            assertThat(KeyValueStore.getInstance("destroy-event-store").size())
                    .isEqualTo(0);


            // destroy
            region.remove("key1");

            assertThat(KeyValueStore.getInstance("create-event-store").get("key1"))
                    .isEqualTo("null:value1");
            assertThat(KeyValueStore.getInstance("update-event-store").get("key1"))
                    .isEqualTo("value1:value1-1");
            assertThat(KeyValueStore.getInstance("destroy-event-store").get("key1"))
                    .isEqualTo("value1-1:null");
        }
    }

今回はRegionは定義済みなので、Cacheから取得するのみです。

        try (Cache cache = new CacheFactory().set("cache-xml-file", "cache-predefined-listener.xml").create()) {
            Region<String, String> region = cache.<String, String>getRegion("sampleRegion");

以降の動作については、APIでaddCacheListenerした時と同じ結果になります。

CacheLoaderを実装する

もうひとつサンプルとして、CacheLoaderを実装してみましょう。

CacheLoaderを作成するには、CacheLoaderインターフェースを実装したクラスを作成します。以下にあるように、Region#getがnullを返す場合にCacheLoaderが呼び出され、キーに対応する値を生成する役割を担います。

Allows data from outside of the VM to be placed into a region. When Region.get(Object) is called for a region entry that has a null value, the load method of the region's cache loader is invoked. The load method creates the value for the desired key by performing an operation such as a database query. The load may also perform a net search that will look for the value in a cache instance hosted by another member of the distributed system.

http://data-docs-samples.cfapps.io/docs-gemfire/latest/javadocs/japi/com/gemstone/gemfire/cache/CacheLoader.html

で、作成したCacheLoaderはこちら。
src/main/java/org/littlewings/geode/event/MyCacheLoader.java

package org.littlewings.geode.event;

import com.gemstone.gemfire.cache.CacheLoader;
import com.gemstone.gemfire.cache.CacheLoaderException;
import com.gemstone.gemfire.cache.LoaderHelper;

public class MyCacheLoader implements CacheLoader<String, String> {
    @Override
    public String load(LoaderHelper<String, String> helper) throws CacheLoaderException {
        String key = helper.getKey();

        KeyValueStore<String, Integer> store = KeyValueStore.getInstance("load-counter");
        store.compute(key, (k, v) -> (v == null) ? 1 : v + 1);

        Thread.dumpStack();

        return key + "-value-loaded";
    }

    @Override
    public void close() {
        // no-op
    }
}

今回は簡単に、キーに対して機械的に値を作成するのみにしました。あと、このCacheLoaderが何回呼び出されたかのカウントを取るようにしています。1度値を返した後は、エントリが残っている限りは呼び出されないはずなので。

確認。

    @Test
    public void cacheLoader() {
        try (Cache cache = new CacheFactory().create()) {
            Region<String, String> region =
                    cache
                            .<String, String>createRegionFactory(RegionShortcut.PARTITION)
                            .setCacheLoader(new MyCacheLoader())
                            .create("sampleRegion");

            assertThat(region.get("key1"))
                    .isEqualTo("key1-value-loaded");
            assertThat(KeyValueStore.getInstance("load-counter").get("key1"))
                    .isEqualTo(1);
            region.get("key1");
            assertThat(KeyValueStore.getInstance("load-counter").get("key1"))
                    .isEqualTo(1);  // not-increment

            assertThat(region.get("key2"))
                    .isEqualTo("key2-value-loaded");
            assertThat(region.get("key3"))
                    .isEqualTo("key3-value-loaded");

            assertThat(KeyValueStore.getInstance("load-counter").get("key2"))
                    .isEqualTo(1);
            assertThat(KeyValueStore.getInstance("load-counter").get("key3"))
                    .isEqualTo(1);
        }
    }

CacheLoaderの適用は、Region作成時に行います。

            Region<String, String> region =
                    cache
                            .<String, String>createRegionFactory(RegionShortcut.PARTITION)
                            .setCacheLoader(new MyCacheLoader())
                            .create("sampleRegion");

今回のCacheLoaderがいると、Region#putしていないのにいきなり値が取れるようになります。

            assertThat(region.get("key1"))
                    .isEqualTo("key1-value-loaded");

1度CacheLoaderが値を返した後は、2度目以降のRegion#getの呼び出しではCacheLoaderの呼び出しは行われません。

            assertThat(KeyValueStore.getInstance("load-counter").get("key1"))
                    .isEqualTo(1);
            region.get("key1");
            assertThat(KeyValueStore.getInstance("load-counter").get("key1"))
                    .isEqualTo(1);  // not-increment

Cache XMLでCacheLoaderを定義する

最後に、Cache XMLでもCacheLoaderを定義してみます。

CacheListenerと同様、Declarableインターフェースを実装する必要があります。こちらも、先ほど作成したCacheLoaderを継承する形で簡単に済ませます。
src/main/java/org/littlewings/geode/event/MyDeclaredCacheLoader.java

package org.littlewings.geode.event;

import java.util.Properties;

import com.gemstone.gemfire.cache.Declarable;

public class MyDeclaredCacheLoader extends MyCacheLoader implements Declarable {
    @Override
    public void init(Properties props) {
        // no-op
    }
}

Cache XMLでは、以下のようにしてCacheLoaderを設定します。
src/test/resources/cache-predefined-loader.xml

<?xml version="1.0" encoding="UTF-8"?>
<cache
        xmlns="http://schema.pivotal.io/gemfire/cache"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://schema.pivotal.io/gemfire/cache http://schema.pivotal.io/gemfire/cache/cache-8.1.xsd"
        version="8.1">
    <region name="sampleRegion" refid="PARTITION">
        <region-attributes>
            <cache-loader>
                <class-name>org.littlewings.geode.event.MyDeclaredCacheLoader</class-name>
            </cache-loader>
        </region-attributes>
    </region>
</cache>

CacheLoaderはCacheListnerと異なり、複数設定することができません。Region作成時も、"setCacheLoader"でしたからね…。

テストコードについては、Cache XMLを指定して作成済みのRegionを取得する以外は、ほとんど変わりません。

    @Test
    public void declaredCacheLoader() {
        try (Cache cache = new CacheFactory().set("cache-xml-file", "cache-predefined-loader.xml").create()) {
            Region<String, String> region = cache.<String, String>getRegion("sampleRegion");

            assertThat(region.get("key1"))
                    .isEqualTo("key1-value-loaded");
            assertThat(KeyValueStore.getInstance("load-counter").get("key1"))
                    .isEqualTo(1);
            region.get("key1");
            assertThat(KeyValueStore.getInstance("load-counter").get("key1"))
                    .isEqualTo(1);  // not-increment

            assertThat(region.get("key2"))
                    .isEqualTo("key2-value-loaded");
            assertThat(region.get("key3"))
                    .isEqualTo("key3-value-loaded");

            assertThat(KeyValueStore.getInstance("load-counter").get("key2"))
                    .isEqualTo(1);
            assertThat(KeyValueStore.getInstance("load-counter").get("key3"))
                    .isEqualTo(1);
        }
    }

テスト結果です。

まとめ

Apache Geodeの、イベントハンドリングの初歩的なものを試してみました。とりあえず、簡単な使い方はわかった感じかなと思います。

とはいえ、やっていないこともたくさんあります。

Client/Serverはやっていませんし、クラスタリングも扱っていません。

http://geode.docs.pivotal.io/docs/developing/events/how_client_server_distribution_works.html

http://geode.docs.pivotal.io/docs/developing/events/how_cache_events_work.html

http://geode.docs.pivotal.io/docs/developing/events/configure_p2p_event_messaging.html

非同期イベントハンドラも、扱いませんでした。

http://geode.docs.pivotal.io/docs/developing/events/implementing_write_behind_event_handler.html

Client/Serverにおいては、HA、キューのサイズ指定などテーマはまだまだたくさんあります。

http://geode.docs.pivotal.io/docs/developing/events/configuring_highly_available_servers.html

といっても概要はなんとなくわかった気がするので、この後は必要になることがあれば、でしょうかね。