CLOVER🍀

That was when it all began.

Apache GeodeのContinuous Queryを試す

Apache Geode 1.0.0-incubating.M2がリリースされました。

Announcing Apache Geode Milestone Releases 1.0.0-incubating M1 & M2 – Seeking Testers : Apache Geode

Release Notesは、こちら。

Release Notes - ASF JIRA

Spring Data Geodeも出たそうな。

Spring Data Geode 1.0.0.APACHE-GEODE-INCUBATING-M2 Released

1.0.0-incubating.M2からの新しい機能としては、

  • Continuous Query(CQ)
  • WAN Replication
  • Pulse(Management UI)

が挙げられるようです。

このブログではWAN Replicationなど縁がありませんが、今回はContinuous Queryを試してみたいと思います。

Continuous Queryとは?

すごいざっくり言うと、OQLで受信するイベントを絞り込めるListenerです。

http://geode.docs.pivotal.io/docs/developing/continuous_querying/how_continuous_querying_works.html

Continuous Queryを使うことで、Server側で発生したイベントをすべて受信するのではなく、Client側が興味のあるイベントのみに絞り込んで受信することができるようになります。

Continuous Queryが使えるのは、Client/Server構成の時のみのようです。

Use continuous querying in your clients to receive continuous updates to queries run on the servers.

CQs are only run by a client on its servers.

http://geode.docs.pivotal.io/docs/developing/continuous_querying/implementing_continuous_querying.html

そうなんですねー。GemFireでもそうみたいです。

http://gemfire.docs.pivotal.io/docs-gemfire/developing/continuous_querying/implementing_continuous_querying.html

CQの動作自体は、「Data Flow with CQs」という部分を読むとよいと思います。
http://geode.docs.pivotal.io/docs/developing/continuous_querying/how_continuous_querying_works.html

概ね、こんな感じみたいです。

  1. Client、もしくはServer(Peer-to-Peer)からエントリの更新イベントが発生する
  2. Server側でCQフレームワークが実行され、各イベントがクエリの条件にマッチするかチェックされる
  3. 新または旧の値がCQの条件を満たしていた場合は、Client側のListenerにイベントが送信され、Listenerはイベントを受け取る

Listenerのイベントの受け取り方としては、

  • クエリの条件を満たすエントリが登録された場合はINSERT(CREATE)
  • 新旧両方の値がクエリの条件を満たす場合はUPDATE
  • クエリの条件を満たしていたエントリが削除された場合はDELETE(DESTROY)

となるようです。このあたりは、あとで動作確認しつつ見ていきましょう。

書けるクエリについてですが、一応制限があるようです。

  • 対象のRegionがPartitioned Region、またはReplicated Regionであること
  • FROM句が単一のRegionを指していること
  • SELECTのみ利用可能(FROM句に式を置くことも不可)
  • RegionのCross Join、ネストされたコレクションへのドリルダウン、DISTINCT、Projection、パラメーターのバインドは不可

http://geode.docs.pivotal.io/docs/developing/continuous_querying/implementing_continuous_querying.html

まあ、Listener的な性格からすると、予想できそうな制限ですね。

というわけで、説明はこのくらいにして実際に使っていってみましょう。

準備

まずは、Maven依存関係から。

        <dependency>
            <groupId>org.apache.geode</groupId>
            <artifactId>geode-core</artifactId>
            <version>1.0.0-incubating.M2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.geode</groupId>
            <artifactId>geode-cq</artifactId>
            <version>1.0.0-incubating.M2</version>
        </dependency>

使用するApache Geodeバージョンは、1.0.0-incubating.M2です。

Continuous Queryを使うためには、「geode-core」と「geode-cq」の2つが必要です。

コンパイルはcoreがあればできるのですが、それだけだと実行時にエラーになるので、実装が含まれたcqモジュールが必要です。なのですが、cqだけだとcoreへの依存関係が入らないという…。自分で明示的に加える必要があります。

あとはテストコード用に、以下を追加。

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.4.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.zeroturnaround</groupId>
            <artifactId>zt-exec</artifactId>
            <version>1.9</version>
            <scope>test</scope>
        </dependency>

外部プロセス実行用に、ZT Process Executorも加えていますが、こちらも後で。

Clinet/Server構成で使用するので、ダウンロードページからApache Geodeのバイナリも取得しておきます。

Releases

展開。

$ tar -zxvf apache-geode-1.0.0-incubating.M2.tar.gz
$ cd apache-geode-1.0.0-incubating.M2

はじめてのContinuous Query

EntityとCqListenerの実装の作成

OQLで絞り込みを行うので、対象のEntityとなるクラスを作成します。テーマは書籍でいきます。
src/main/java/org/littlewings/geode/cq/Book.java

package org.littlewings.geode.cq;

import java.io.Serializable;

public class Book implements Serializable {
    private static final long serialVersionUID = 1L;

    private String isbn;

    private String title;

    private Integer price;

    public static Book create(String isbn, String title, Integer price) {
        Book book = new Book();
        book.setIsbn(isbn);
        book.setTitle(title);
        book.setPrice(price);
        return book;
    }

    public String getIsbn() {
        return isbn;
    }

    public void setIsbn(String isbn) {
        this.isbn = isbn;
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public Integer getPrice() {
        return price;
    }

    public void setPrice(Integer price) {
        this.price = price;
    }
}

そして、以下のドキュメントを参考にContinuous Queryで受信対象となったイベントを受け取るためのクラスを作成します。CqListenerというインターフェースを実装したクラスを作成する必要があるようです。
※Serverの接続/離脱を検知したい場合はCqStatusListenerインターフェースを実装する必要があるようですが、今回は対象外とします

http://geode.docs.pivotal.io/docs/developing/continuous_querying/implementing_continuous_querying.html

で、今回作成したCqListenerの実装クラス。
src/main/java/org/littlewings/geode/cq/MyCqListenerImpl.java

package org.littlewings.geode.cq;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.query.CqEvent;
import com.gemstone.gemfire.cache.query.CqListener;

public class MyCqListenerImpl implements CqListener {
    private List<Map<String, Object>> createdEntries = Collections.synchronizedList(new ArrayList<>());
    private List<Map<String, Object>> updatedEntries = Collections.synchronizedList(new ArrayList<>());
    private List<Map<String, Object>> deletedEntries = Collections.synchronizedList(new ArrayList<>());

    @Override
    public void onEvent(CqEvent cqEvent) {
        Operation queryOperation = cqEvent.getQueryOperation();

        String key = (String) cqEvent.getKey();

        if (queryOperation.isCreate()) {
            createdEntries.add(createEntry(key, (Book) cqEvent.getNewValue()));
        } else if (queryOperation.isUpdate()) {
            updatedEntries.add(createEntry(key, (Book) cqEvent.getNewValue()));
        } else if (queryOperation.isDestroy()) {
            deletedEntries.add(createEntry(key, (Book) cqEvent.getNewValue()));
        }
    }

    protected Map<String, Object> createEntry(String key, Book book) {
        Map<String, Object> map = new HashMap<>();
        map.put("key", key);

        if (book != null) {
            map.put("value", book);
        }

        return map;
    }

    @Override
    public void onError(CqEvent cqEvent) {
        Throwable th = cqEvent.getThrowable();
        th.printStackTrace();
    }

    @Override
    public void close() {
        // no-op
    }

    public List<Map<String, Object>> getCreatedEntries() {
        return createdEntries;
    }

    public List<Map<String, Object>> getUpdatedEntries() {
        return updatedEntries;
    }

    public List<Map<String, Object>> getDeletedEntries() {
        return deletedEntries;
    }
}

イベント受信時に、可能であればキーと値を保存するようにしておきました。まあ、動作確認用です。

なお、Continuous QueryがListenerであることを示すように、CqListenerインターフェースは、イベント処理でコールバックを受けるインターフェースの最も基礎にいる、CacheCallbackインターフェースを継承しています。
https://github.com/apache/incubator-geode/blob/rel/v1.0.0-incubating.M2/geode-core/src/main/java/com/gemstone/gemfire/cache/query/CqListener.java#L31

Locator/Serverの起動

Continuous QueryはClient/Server構成で動作するということなので、Server側の準備をする必要があります。

gfshを起動して、ここからLocatorとServerを起動しましょう。

$ bin/gfsh

gfsh>start locator --name=locator
gfsh>start server --name=server

Regionも作成します。

gfsh>create region --name=bookRegion --type=PARTITION_REDUNDANT
Member | Status
------ | ----------------------------------------
server | Region "/bookRegion" created on "server"
テストコードで動作確認

では、テストコードでContinuous Queryの動作確認をしてみます。以下のようなクラスを用意。
src/test/java/org/littlewings/geode/cq/ContinuousQuerySimpleTest.java

package org.littlewings.geode.cq;

import java.util.concurrent.TimeUnit;

import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.client.ClientCache;
import com.gemstone.gemfire.cache.client.ClientCacheFactory;
import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
import com.gemstone.gemfire.cache.query.CqAttributes;
import com.gemstone.gemfire.cache.query.CqAttributesFactory;
import com.gemstone.gemfire.cache.query.CqException;
import com.gemstone.gemfire.cache.query.CqListener;
import com.gemstone.gemfire.cache.query.CqQuery;
import com.gemstone.gemfire.cache.query.CqResults;
import com.gemstone.gemfire.cache.query.QueryService;
import com.gemstone.gemfire.cache.query.RegionNotFoundException;
import com.gemstone.gemfire.cache.query.Struct;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class ContinuousQuerySimpleTest {
    private static final Book ELASTICSEARCH_BOOK = Book.create("978-4048662024", "高速スケーラブル検索エンジン ElasticSearch Server", 3024);
    private static final Book SPARK_BOOK = Book.create("978-4873117348", "初めてのSpark", 3456);

    @Test
    public void gettingStarted() throws CqException, RegionNotFoundException {
        try (ClientCache cache = new ClientCacheFactory().setPoolSubscriptionEnabled(true).create()) {
            Region<String, Book> region =
                    cache.<String, Book>createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("bookRegion");

            CqListener cqListener = new MyCqListenerImpl();

            try {
                CqAttributesFactory cqAttrFactory = new CqAttributesFactory();
                cqAttrFactory.addCqListener(cqListener);
                CqAttributes cqAttr = cqAttrFactory.create();

                QueryService queryService = cache.getQueryService();
                CqQuery cqQuery = queryService.newCq("SELECT * FROM /bookRegion b WHERE b.price > 3000 AND b.price < 4000", cqAttr);

                CqResults<Struct> results = cqQuery.executeWithInitialResults();
                assertThat(results).isEmpty();

                region.put(ELASTICSEARCH_BOOK.getIsbn(), ELASTICSEARCH_BOOK);
                TimeUnit.SECONDS.sleep(1L);

                assertThat(((MyCqListenerImpl) cqListener).getCreatedEntries())
                        .hasSize(1);
                assertThat(((Book) ((MyCqListenerImpl) cqListener).getCreatedEntries().get(0).get("value")).getPrice())
                        .isEqualTo(3024);
                assertThat(((MyCqListenerImpl) cqListener).getUpdatedEntries())
                        .isEmpty();
                assertThat(((MyCqListenerImpl) cqListener).getDeletedEntries())
                        .isEmpty();
            } finally {
                cqListener.close();
            }
        }
    }
}

利用するのはClientCacheですが、まずドキュメントに則ってSubscriptionを有効にする必要があります。

Configure the client pools you will use for CQs with subscription-enabled set to true.

http://geode.docs.pivotal.io/docs/developing/continuous_querying/implementing_continuous_querying.html
        try (ClientCache cache = new ClientCacheFactory().setPoolSubscriptionEnabled(true).create()) {

続いて、実装したCqListenerのインスタンスを作成して、CqAttributeFactoryへ加えてCqAttributeを取得します。

            CqListener cqListener = new MyCqListenerImpl();

            try {
                CqAttributesFactory cqAttrFactory = new CqAttributesFactory();
                cqAttrFactory.addCqListener(cqListener);
                CqAttributes cqAttr = cqAttrFactory.create();

取得したCqAttributeをQueryService#newCqで登録してできあがりです。

                QueryService queryService = cache.getQueryService();
                CqQuery cqQuery = queryService.newCq("SELECT * FROM /bookRegion b WHERE b.price > 3000 AND b.price < 4000", cqAttr);

クエリの条件は、書籍の価格で絞り込むようにしています。

今回はもっとも単純なnewCqメソッドを使用しましたが、クエリに名前を付けられるnewCqメソッドもあるので、通常はそちらを使うのでしょう。
名前を明示しないと、Geode側で自動生成して付与します。
https://github.com/apache/incubator-geode/blob/rel/v1.0.0-incubating.M2/geode-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceImpl.java#L230

で、ここで返ってくるCqQueryに対してexecuteWithInitialResultsメソッドを呼び出すと、イベント受信が始まります。このあと、Regionに対してputやremoveなどの更新処理を行えばOKです。

                CqResults<Struct> results = cqQuery.executeWithInitialResults();

※開始するだけなら、CqQuery#executeでもよい?

なんですけど、このままこのテストコードを動かすと…

Caused by: com.gemstone.gemfire.SerializationException: A ClassNotFoundException was thrown while trying to deserialize cached value.
Caused by: java.lang.ClassNotFoundException: org.littlewings.geode.cq.Book

となり、Server側にEntityをデプロイしておく必要があるようです。

というわけで、Entityのみを含んだJARファイルを作成し

$ jar -cvf entity.jar org/littlewings/geode/cq/Book.class

デプロイします。

gfsh>deploy --jar=/path/to/entity.jar
Member | Deployed JAR | Deployed JAR Location
------ | ------------ | ----------------------------------------------------------------------------------------------------------
server | entity.jar   | /path/to/apache-geode-1.0.0-incubating.M2/server/vf.gf#entity.jar#1

これで、動作するようになりました!

すでに1度プログラムを動かしている場合は、1度Server側をデータクリアのために再起動したりするとよいでしょう。

なお、CqListernerは非同期で動いているようなので、今回は1秒スリープを入れてイベント受信を待つようにしています。

                region.put(ELASTICSEARCH_BOOK.getIsbn(), ELASTICSEARCH_BOOK);
                TimeUnit.SECONDS.sleep(1L);

結果、INSERT(CREATE)イベントが受信できましたよ、と。
※登録したエントリは、クエリの条件を満たすものです

                assertThat(((MyCqListenerImpl) cqListener).getCreatedEntries())
                        .hasSize(1);
                assertThat(((Book) ((MyCqListenerImpl) cqListener).getCreatedEntries().get(0).get("value")).getPrice())
                        .isEqualTo(3024);
                assertThat(((MyCqListenerImpl) cqListener).getUpdatedEntries())
                        .isEmpty();
                assertThat(((MyCqListenerImpl) cqListener).getDeletedEntries())
                        .isEmpty();

また、CqQuery#executeWithInitialResultsについてですが、こちらはCqListenerが動作する前に登録されていた、Continuous Queryの条件にマッチするデータが取得できます。

例えば、以下のようなコードにすると

    @Test
    public void gettingStartedBefureQueryExecution() throws CqException, RegionNotFoundException, InterruptedException {
        try (ClientCache cache = new ClientCacheFactory().setPoolSubscriptionEnabled(true).create()) {
            Region<String, Book> region =
                    cache.<String, Book>createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("bookRegion");

            CqListener cqListener = new MyCqListenerImpl();

            try {
                CqAttributesFactory cqAttrFactory = new CqAttributesFactory();
                cqAttrFactory.addCqListener(cqListener);
                CqAttributes cqAttr = cqAttrFactory.create();

                QueryService queryService = cache.getQueryService();
                CqQuery cqQuery = queryService.newCq("SELECT * FROM /bookRegion b WHERE b.price > 3000 AND b.price < 4000", cqAttr);

                region.put(ELASTICSEARCH_BOOK.getIsbn(), ELASTICSEARCH_BOOK);
                TimeUnit.SECONDS.sleep(1L);

                CqResults<Struct> results = cqQuery.executeWithInitialResults();
                assertThat(results).hasSize(1);
                assertThat(((String)results.asList().get(0).get("key")))
                        .isEqualTo("978-4048662024");
                assertThat(((Book)results.asList().get(0).get("value")).getPrice())
                        .isEqualTo(3024);

                assertThat(((MyCqListenerImpl) cqListener).getCreatedEntries())
                        .isEmpty();
                assertThat(((MyCqListenerImpl) cqListener).getUpdatedEntries())
                        .isEmpty();
                assertThat(((MyCqListenerImpl) cqListener).getDeletedEntries())
                        .isEmpty();

                region.put(SPARK_BOOK.getIsbn(), SPARK_BOOK);
                TimeUnit.SECONDS.sleep(1L);

                assertThat(((MyCqListenerImpl) cqListener).getCreatedEntries())
                        .hasSize(1);
                assertThat(((MyCqListenerImpl) cqListener).getUpdatedEntries())
                        .isEmpty();
                assertThat(((MyCqListenerImpl) cqListener).getDeletedEntries())
                        .isEmpty();
            } finally {
                cqListener.close();
            }
        }
    }

最初にデータを登録してからCqListener#executeWithInitialResultsを呼び出すと、結果の件数が変化します。

                region.put(ELASTICSEARCH_BOOK.getIsbn(), ELASTICSEARCH_BOOK);
                TimeUnit.SECONDS.sleep(1L);

                CqResults<Struct> results = cqQuery.executeWithInitialResults();
                assertThat(results).hasSize(1);
                assertThat(((String)results.asList().get(0).get("key")))
                        .isEqualTo("978-4048662024");
                assertThat(((Book)results.asList().get(0).get("value")).getPrice())
                        .isEqualTo(3024);

結果はStructとして取得でき、「key」と「value」でそれぞれ対応するキーと値が取得できます。

この時点では、CqListenerにはイベント送信は行われていません。CqListener#executeWithInitialResultsの呼び出し後から、受信が始まります。

もっとContinuous Query

とりあえずContinuous Queryを動かしてみたわけですが、もうちょっとバリエーションを見てみたいと思います。

その前に

Continuous QueryってClient/Server構成になるので、前回のテストの実行状態に依存してしまったりするのは避けたいので、ちょっと重いですが毎回Serverを起動しなおすことにします。

ZT Process Executorを使って、LocatorとServerを毎回作り直すことにしました。
src/test/java/org/littlewings/geode/cq/ContinuousQueryTest.java

package org.littlewings.geode.cq;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.client.ClientCache;
import com.gemstone.gemfire.cache.client.ClientCacheFactory;
import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
import com.gemstone.gemfire.cache.query.CqAttributes;
import com.gemstone.gemfire.cache.query.CqAttributesFactory;
import com.gemstone.gemfire.cache.query.CqException;
import com.gemstone.gemfire.cache.query.CqListener;
import com.gemstone.gemfire.cache.query.CqQuery;
import com.gemstone.gemfire.cache.query.CqResults;
import com.gemstone.gemfire.cache.query.QueryService;
import com.gemstone.gemfire.cache.query.RegionNotFoundException;
import com.gemstone.gemfire.cache.query.Struct;
import com.gemstone.gemfire.distributed.LocatorLauncher;
import com.gemstone.gemfire.distributed.ServerLauncher;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.zeroturnaround.exec.ProcessExecutor;
import org.zeroturnaround.exec.StartedProcess;
import org.zeroturnaround.exec.stream.slf4j.Slf4jStream;

import static org.assertj.core.api.Assertions.assertThat;

public class ContinuousQueryTest {
    private static final Book SPRING_BOOT_BOOK = Book.create("978-4777518654", "はじめてのSpring Boot―「Spring Framework」で簡単Javaアプリ開発", 2700);
    private static final Book JAVA_EE_7_BOOK = Book.create("978-4798140926", "Java EE 7徹底入門 標準Javaフレームワークによる高信頼性Webシステムの構築", 4104);
    private static final Book ELASTICSEARCH_BOOK = Book.create("978-4048662024", "高速スケーラブル検索エンジン ElasticSearch Server", 3024);

    private StartedProcess locatorProcess;
    private StartedProcess serverProcess;

    @Before
    public void setUp() throws IOException, InterruptedException {
        locatorProcess =
                new ProcessExecutor()
                        .command("mvn",
                                "exec:java",
                                "-Dexec.mainClass=" + LocatorLauncher.class.getName(),
                                "-Dexec.args=start",
                                "-Dgemfire.name=locator")
                        .redirectOutput(Slf4jStream.of(getClass()).asInfo())
                        .start();
        serverProcess =
                new ProcessExecutor()
                        .command("mvn",
                                "exec:java",
                                "-Dexec.mainClass=" + ServerLauncher.class.getName(),
                                "-Dexec.args=start",
                                "-Dgemfire.name=server",
                                "-Dgemfire.cache-xml-file=src/test/resources/server-cache.xml")
                        .redirectOutput(Slf4jStream.of(getClass()).asInfo())
                        .start();

        TimeUnit.SECONDS.sleep(10L);
    }

    @After
    public void tearDown() {
        serverProcess.getProcess().destroy();
        locatorProcess.getProcess().destroy();
    }

    // ここに、テストを書く!
}

Cache XMLファイルはあらかじめ用意して、Serverにシステムプロパティで指定するようにしています。
src/test/resources/server-cache.xml

<?xml version="1.0" encoding="UTF-8"?>
<cache
        xmlns="http://schema.pivotal.io/gemfire/cache"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://schema.pivotal.io/gemfire/cache http://schema.pivotal.io/gemfire/cache/cache-8.1.xsd"
        version="8.1">
    <region name="bookRegion" refid="PARTITION_REDUNDANT"/>
</cache>

この状態だと、Entityはクラスパス上に存在していることになるので、別途deployする必要はありません。とりあえず。

で、テストを書いて動かしてみます。クエリの範囲内に収まるエントリをひとつ、収まらないエントリを2つ投下。

    @Test
    public void simpleCase() throws CqException, RegionNotFoundException {
        try (ClientCache cache = new ClientCacheFactory().setPoolSubscriptionEnabled(true).create()) {
            Region<String, Book> region =
                    cache.<String, Book>createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("bookRegion");

            CqListener cqListener = new MyCqListenerImpl();

            try {
                CqAttributesFactory cqAttrFactory = new CqAttributesFactory();
                cqAttrFactory.addCqListener(cqListener);
                CqAttributes cqAttr = cqAttrFactory.create();

                QueryService queryService = cache.getQueryService();
                CqQuery cqQuery = queryService.newCq("SELECT * FROM /bookRegion b WHERE b.price > 3000 AND b.price < 4000", cqAttr);

                CqResults<Struct> results = cqQuery.executeWithInitialResults();
                assertThat(results).isEmpty();

                region.put(ELASTICSEARCH_BOOK.getIsbn(), ELASTICSEARCH_BOOK);
                region.put(SPRING_BOOT_BOOK.getIsbn(), SPRING_BOOT_BOOK);
                region.put(JAVA_EE_7_BOOK.getIsbn(), JAVA_EE_7_BOOK);

                assertThat(((MyCqListenerImpl) cqListener).getCreatedEntries())
                        .hasSize(1);
                assertThat(((MyCqListenerImpl) cqListener).getUpdatedEntries())
                        .isEmpty();
                assertThat(((MyCqListenerImpl) cqListener).getDeletedEntries())
                        .isEmpty();
            } finally {
                cqListener.close();
            }
        }
    }

これで、テストクラス単体で動かせそうです。

バリエーションを見てみよう

テストクラス単体で動かせるようになったので、もうちょっとContinuous Queryでエントリをどう更新すると、どうイベントが発生するかを見てみたいと思います。

とりあえず、大枠を埋めたコードを用意。

    @Test
    public void variation() throws CqException, RegionNotFoundException, InterruptedException {
        try (ClientCache cache = new ClientCacheFactory().setPoolSubscriptionEnabled(true).create()) {
            Region<String, Book> region =
                    cache.<String, Book>createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("bookRegion");

            CqListener cqListener = new MyCqListenerImpl();

            try {
                CqAttributesFactory cqAttrFactory = new CqAttributesFactory();
                cqAttrFactory.addCqListener(cqListener);
                CqAttributes cqAttr = cqAttrFactory.create();

                QueryService queryService = cache.getQueryService();
                CqQuery cqQuery = queryService.newCq("SELECT * FROM /bookRegion b WHERE b.price > 3000 AND b.price < 4000", cqAttr);

                CqResults<Struct> results = cqQuery.executeWithInitialResults();
                assertThat(results).isEmpty();

                // ここを埋める

            } finally {
                cqListener.close();
            }
        }
    }

コメント部を、順次埋めていきます。

クエリの条件は、今回もこのようにしています。3,000円より大きく、4,000円を下回る、と。

                CqQuery cqQuery = queryService.newCq("SELECT * FROM /bookRegion b WHERE b.price > 3000 AND b.price < 4000", cqAttr);

まずは、クエリの範囲外のエントリを登録。

                // クエリの範囲外
                region.put(SPRING_BOOT_BOOK.getIsbn(), SPRING_BOOT_BOOK);
                TimeUnit.SECONDS.sleep(1L);

                assertThat(((MyCqListenerImpl) cqListener).getCreatedEntries())
                        .isEmpty();
                assertThat(((MyCqListenerImpl) cqListener).getUpdatedEntries())
                        .isEmpty();
                assertThat(((MyCqListenerImpl) cqListener).getDeletedEntries())
                        .isEmpty();

何も起こりません。

クエリの範囲内の条件に更新してみます。

                // クエリの範囲内へ更新
                Book springBootBook = Book.create("978-4777518654", "はじめてのSpring Boot―「Spring Framework」で簡単Javaアプリ開発", 3200);
                region.put(springBootBook.getIsbn(), springBootBook);
                TimeUnit.SECONDS.sleep(1L);

                assertThat(((MyCqListenerImpl) cqListener).getCreatedEntries())
                        .hasSize(1);  // 作成
                assertThat(((Book)((MyCqListenerImpl) cqListener).getCreatedEntries().get(0).get("value")).getPrice())
                        .isEqualTo(3200);
                assertThat(((MyCqListenerImpl) cqListener).getUpdatedEntries())
                        .isEmpty();
                assertThat(((MyCqListenerImpl) cqListener).getDeletedEntries())
                        .isEmpty();

「INSERT」扱いになりました。

さらに、クエリの範囲内に収まるように更新してみましょう。

                // クエリの範囲内で更新
                springBootBook = Book.create("978-4777518654", "はじめてのSpring Boot―「Spring Framework」で簡単Javaアプリ開発", 3700);
                region.put(springBootBook.getIsbn(), springBootBook);
                TimeUnit.SECONDS.sleep(1L);

                assertThat(((MyCqListenerImpl) cqListener).getCreatedEntries())
                        .hasSize(1);
                assertThat(((MyCqListenerImpl) cqListener).getUpdatedEntries())
                        .hasSize(1);  // 更新
                assertThat(((Book)((MyCqListenerImpl) cqListener).getUpdatedEntries().get(0).get("value")).getPrice())
                        .isEqualTo(3700);
                assertThat(((MyCqListenerImpl) cqListener).getDeletedEntries())
                        .isEmpty();

「UPDATE」になりましたね。

クエリの範囲外に出るように更新してみます。

                // クエリの範囲外へ
                springBootBook = Book.create("978-4777518654", "はじめてのSpring Boot―「Spring Framework」で簡単Javaアプリ開発", 4200);
                region.put(springBootBook.getIsbn(), springBootBook);
                TimeUnit.SECONDS.sleep(1L);

                assertThat(((MyCqListenerImpl) cqListener).getCreatedEntries())
                        .hasSize(1);
                assertThat(((MyCqListenerImpl) cqListener).getUpdatedEntries())
                        .hasSize(1);
                assertThat(((MyCqListenerImpl) cqListener).getDeletedEntries())
                        .hasSize(1);  // 削除
                assertThat(((Book)((MyCqListenerImpl) cqListener).getDeletedEntries().get(0).get("value")).getPrice())
                        .isEqualTo(4200);

「DELETE」ということなりました。

再度、クエリの範囲内に入れ込みます。

                // クエリの範囲内へ更新
                springBootBook = Book.create("978-4777518654", "はじめてのSpring Boot―「Spring Framework」で簡単Javaアプリ開発", 3700);
                region.put(springBootBook.getIsbn(), springBootBook);
                TimeUnit.SECONDS.sleep(1L);

                assertThat(((MyCqListenerImpl) cqListener).getCreatedEntries())
                        .hasSize(2);  // 作成
                assertThat(((MyCqListenerImpl) cqListener).getUpdatedEntries())
                        .hasSize(1);
                assertThat(((MyCqListenerImpl) cqListener).getDeletedEntries())
                        .hasSize(1);

「INSERT」になりました。

明示的にremoveしてみます。

                // 削除
                region.remove(SPRING_BOOT_BOOK.getIsbn());
                TimeUnit.SECONDS.sleep(1L);

                assertThat(((MyCqListenerImpl) cqListener).getCreatedEntries())
                        .hasSize(2);
                assertThat(((MyCqListenerImpl) cqListener).getUpdatedEntries())
                        .hasSize(1);
                assertThat(((MyCqListenerImpl) cqListener).getDeletedEntries())
                        .hasSize(2);  // 削除

こちらも「DELETE」になります。

最後、もう1度クエリの範囲に収めてみます。

                // 再度クエリの範囲内で追加
                springBootBook = Book.create("978-4777518654", "はじめてのSpring Boot―「Spring Framework」で簡単Javaアプリ開発", 3200);
                region.put(springBootBook.getIsbn(), springBootBook);
                TimeUnit.SECONDS.sleep(1L);

                assertThat(((MyCqListenerImpl) cqListener).getCreatedEntries())
                        .hasSize(3);  // 作成
                assertThat(((MyCqListenerImpl) cqListener).getUpdatedEntries())
                        .hasSize(1);
                assertThat(((MyCqListenerImpl) cqListener).getDeletedEntries())
                        .hasSize(2);

「INSERT」になりましたね。

バリエーションを見ると

こういう感じでしょうか?

発生イベント
クエリの範囲外 クエリの範囲外 なにも起こらない
クエリの範囲外 クエリの範囲内 作成(INSERT)
クエリの範囲内 クエリの範囲内 更新(UPDATE)
クエリの範囲内 クエリの範囲外 削除(DELETE)
クエリの範囲内 削除(Region#remove) 削除(DELETE)

あとはExpireも考慮すべき、というような気はします。

まとめ

Apache Geode 1.0.0-incubating.M2から利用できるようになった、Continuous Queryを試してみました。

個人的には他のIn Memory Data Gridで試したことはあったので概念的にはそれほど戸惑いませんでしたが、Apache Geodeで使うにあたってのセットアップ的なところで若干つまづいたというか、Client/Serverで動作させるようにするのが面倒だったというか…まあ、そのくらいですね。

継続的に発生するデータ更新を、都度リアルタイムに見ていきたい場合に便利な機能なんだろうなーと思います。