CLOVER🍀

That was when it all began.

Apache GeodeのFunctionを試す

Apache Geodeには、クライアント、もしくはサーバ側からFunctionを実行する機能があります。

http://geode.docs.pivotal.io/docs/developing/function_exec/chapter_overview.html

こちらのページやその他のドキュメントを見ると、以下のような特徴を持つようです。

  • 呼び出すがFunctionが、特定のサーバー、メンバー、またはグループで動作するようにさせることができる
  • Functionはサーバー上で実行され、呼び出し元に戻ってくる
  • Functionは、各サーバーで1度初期化する必要がある。また、Functionは再度利用される可能性がある
  • サーバーのデータ配置状況を考慮した実行ができるため、任意の集計演算を効率的に行うことができる
  • データに依存した処理も、しない処理も記述可能

Apache Geode上で、Functionがどのような形態で実行されるかは、以下のドキュメントに記載があります。

http://geode.docs.pivotal.io/docs/developing/function_exec/how_function_execution_works.html

  • 特定のメンバー、複数のメンバーを指定して実行(P2P
  • 特定のサーバー、複数のサーバーを指定して実行(Client/Server)
  • メンバーグループを指定して実行
  • データセット(Region)を指定して実行

これらの構成の図が示してあったり、HA(障害時にリトライする)についても記載があります。図については、わかりやすく書かれているので、見るとイメージしやすいかと思います。

それでは、今回はこのFunctionを使って遊んでみるとします。

利用するApache Geodeバージョンは、1.0.0-incubating.M1とします。構成はPeer-to-Peer構成とし、Client/Server構成については今回は扱いません。

準備

まずは、Maven依存関係から。

        <dependency>
            <groupId>org.apache.geode</groupId>
            <artifactId>gemfire-core</artifactId>
            <version>1.0.0-incubating.M1</version>
        </dependency>

        <dependency>
            <groupId>org.zeroturnaround</groupId>
            <artifactId>zt-exec</artifactId>
            <version>1.8</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.3.0</version>
            <scope>test</scope>
        </dependency>

Apache Geodeとテスト関係の依存関係を加えていますが、ちょっと理由があってZT Process Executorも使うことにします。

テストコードの雛形

Functionの定義といきたいところですが、最初にテストコードの雛形を記述します。
src/test/java/org/littlewings/geode/function/FunctionServiceTest.java

package org.littlewings.geode.function;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.IntStream;

import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionShortcut;
import com.gemstone.gemfire.cache.execute.Execution;
import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionService;
import com.gemstone.gemfire.cache.execute.ResultCollector;
import com.gemstone.gemfire.distributed.DistributedMember;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail;

public class FunctionServiceTest {
    // ここに、テストを書く!
}

利用するimport文は、こちらを参照してください。

はじめてのFunction定義

それでは、こちらのドキュメントを見ながらFunctionを書いていきたいと思います。

http://geode.docs.pivotal.io/docs/developing/function_exec/function_execution.html

最初は、「Hello World」的なものを書いてみました。
src/main/java/org/littlewings/geode/function/HelloWorldFunction.java

package org.littlewings.geode.function;

import com.gemstone.gemfire.cache.execute.FunctionAdapter;
import com.gemstone.gemfire.cache.execute.FunctionContext;

public class HelloWorldFunction extends FunctionAdapter {
    @Override
    public void execute(FunctionContext fc) {
        fc.getResultSender().lastResult("Hello My Function!");
    }

    @Override
    public String getId() {
        return getClass().getName();
    }
}

Functionインターフェースを実装したクラスを作成するのですが、今回はFunctionインターフェースのいくつかのメソッドを実装済みのFunctionAdapterクラスを継承することにしました。

FunctionAdapterを継承すると、以下のメソッドのデフォルト実装が決定します。

  • Function#hasResultがtrueを返す(Function実行時に、Functionが結果を送信元にも戻す場合はtrue
  • Function#optimizeForWriteがfalseを返す(trueにすると、FunctionService#onRegionでの書き込みの最適化)
  • Funciton#isHAがtrueを返す(障害時にリランする)

あとは、executeとgetIdを実装すればOKです。

getIdにはFunctionのIDを設定しますが、今回はドキュメントのサンプル実装と同じくクラス名としました。

executeには、実装したい処理そのものを書きます。

    @Override
    public void execute(FunctionContext fc) {
        fc.getResultSender().lastResult("Hello My Function!");
    }

今回は、「Hello My Function!」と返すFunctionとしました。

では、テストを書いて動かしてみます。Nodeはひとつで実行です。

    @Test
    public void functionServiceCall() {
        try (Cache cache = new CacheFactory().create()) {
            Region<String, String> region =
                    cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion");

            Function function = new HelloWorldFunction();
            FunctionService.registerFunction(function);

            Execution execution =
                    FunctionService
                            .onRegion(region);

            ResultCollector rc = execution.execute(function);
            List<String> results = (List<String>) rc.getResult();

            assertThat(results)
                    .hasSize(1)
                    .containsOnly("Hello My Function!");

            FunctionService.unregisterFunction(function.getId());
        }
    }

Functionは、newしてFunctionServiceに登録します。

            Function function = new HelloWorldFunction();
            FunctionService.registerFunction(function);

次に、FunctionServiceからExecutionを取得します。今回はFunctionService#onRegionを使用していますが、他にonMemberやonServer(Client/Server)などもあります。

            Execution execution =
                    FunctionService
                            .onRegion(region);

Execution#executeで、Functionを呼び出します。結果はResultCollectorという型で返されるので、ここから適切な型にキャストします。ResultCollectorのデフォルト実装では、ArrayListを戻す(結果をすべてメモリに持つ)ようです。

            ResultCollector rc = execution.execute(function);
            List<String> results = (List<String>) rc.getResult();

            assertThat(results)
                    .hasSize(1)
                    .containsOnly("Hello My Function!");

FunctionServiceからのFunctionの登録解除は、FunctionService#unregisterFunctionで行います。

            FunctionService.unregisterFunction(function.getId());

この時は、FunctionのIDで解除する必要があります。

とりあえず、動かすことはできましたね。では、もっとサンプルを書いていきましょう。

Execution#execute時に、ID指定で呼び出す

先ほどのコード例では、Execution#executeを呼び出す際にFunctionのインスタンスを指定していましたが、IDを指定しての呼び出しも可能です。

    @Test
    public void functionServiceCallById() {
        try (Cache cache = new CacheFactory().create()) {
            Region<String, String> region =
                    cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion");

            FunctionService.registerFunction(new HelloWorldFunction());
            String id = HelloWorldFunction.class.getName();

            Execution execution =
                    FunctionService
                            .onRegion(region);

            ResultCollector rc = execution.execute(id);
            List<String> results = (List<String>) rc.getResult();

            assertThat(results)
                    .hasSize(1)
                    .containsOnly("Hello My Function!");

            FunctionService.unregisterFunction(id);
        }
    }

その他は先ほどと同じなので、割愛。

Cache XMLに登録済みのFunctionを呼び出す

Cache XMLで登録済みのFunctionを呼び出すこともできます。関数登録のコードは不要になり、ID指定でExecution#executeすればOKです。

    @Test
    public void predefinedFunctionServiceCallById() {
        try (Cache cache = new CacheFactory().set("cache-xml-file", "cache-simple-predefined.xml").create()) {
            Region<String, String> region =
                    cache.<String, String>getRegion("sampleRegion");

            String id = HelloWorldDeclarableFunction.class.getName();

            Execution execution =
                    FunctionService
                            .onRegion(region);

            ResultCollector rc = execution.execute(id);
            List<String> results = (List<String>) rc.getResult();

            assertThat(results)
                    .hasSize(1)
                    .containsOnly("Hello My Declarable Function!");

            FunctionService.unregisterFunction(id);
        }
    }

使用したCache XMLは、こちら。
rc/main/resources/cache-simple-predefined.xml

<?xml version="1.0" encoding="UTF-8"?>
<cache
        xmlns="http://schema.pivotal.io/gemfire/cache"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://schema.pivotal.io/gemfire/cache http://schema.pivotal.io/gemfire/cache/cache-8.1.xsd"
        version="8.1">
    <region name="sampleRegion" refid="PARTITION"/>
    <function-service>
        <function>
            <class-name>org.littlewings.geode.function.HelloWorldDeclarableFunction</class-name>
        </function>
    </function-service>
</cache>

function-serviceおよびfunction、class-nameタグで、登録したいFunctionのクラス名を指定します。

そして、登録しているクラスはこちら。
src/main/java/org/littlewings/geode/function/HelloWorldDeclarableFunction.java

package org.littlewings.geode.function;

import java.util.Properties;

import com.gemstone.gemfire.cache.Declarable;
import com.gemstone.gemfire.cache.execute.FunctionAdapter;
import com.gemstone.gemfire.cache.execute.FunctionContext;

public class HelloWorldDeclarableFunction extends FunctionAdapter implements Declarable {
    @Override
    public void init(Properties props) {
    }

    @Override
    public void execute(FunctionContext fc) {
        fc.getResultSender().lastResult("Hello My Declarable Function!");
    }

    @Override
    public String getId() {
        return getClass().getName();
    }
}

先ほどと違うのは、Declarableというインターフェースを実装し、initメソッドを定義していることです。initメソッドでは、今回は使用していませんがCache XML中で設定されたパラメーターがPropertiesとして渡されてくるようです。

動作自体はこれまでの2つとそれほど変わらないので、説明は割愛。

Functionに引数を渡す

Functionへは、呼び出し時に引数を渡すことができます。

Function側にも、引数を取得する実装を書く必要があります。サンプルは、こちら。
src/main/java/org/littlewings/geode/function/HelloArgsFunction.java

package org.littlewings.geode.function;

import com.gemstone.gemfire.cache.execute.FunctionAdapter;
import com.gemstone.gemfire.cache.execute.FunctionContext;

public class HelloArgsFunction extends FunctionAdapter {
    @Override
    public void execute(FunctionContext fc) {
        Object argument = fc.getArguments();
        fc.getResultSender().lastResult(String.format("Hello %s!", argument));
    }

    @Override
    public String getId() {
        return getClass().getName();
    }
}

FunctionContext#getArgumentsで、実行時に渡された引数を取得することができます。

呼び出し側は、このようになります。

    @Test
    public void functionServiceCallWithArgument() {
        try (Cache cache = new CacheFactory().create()) {
            Region<String, String> region =
                    cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion");

            Function function = new HelloArgsFunction();
            FunctionService.registerFunction(function);

            Execution execution =
                    FunctionService
                            .onRegion(region)
                            .withArgs("World");

            ResultCollector rc = execution.execute(function);
            List<String> results = (List<String>) rc.getResult();

            assertThat(results)
                    .hasSize(1)
                    .containsOnly("Hello World!");

            FunctionService.unregisterFunction(function.getId());
        }
    }

Executionを取得する際に、withArgsで設定すればOKです。

            Execution execution =
                    FunctionService
                            .onRegion(region)
                            .withArgs("World");

ResultSenderに複数の結果を与えて返す

ここまで書いてきたFunctionのサンプルはすべてResultSender#lastResultを使って実装していましたが、lastResultはFunctionが返す最後の値を示すものであり、他に返すものがあればResultSender#sendResultを使うと戻り値として追加することができます。

そのサンプル。
src/main/java/org/littlewings/geode/function/HelloMultipleResultFunction.java

package org.littlewings.geode.function;

import com.gemstone.gemfire.cache.execute.FunctionAdapter;
import com.gemstone.gemfire.cache.execute.FunctionContext;
import com.gemstone.gemfire.cache.execute.ResultSender;

public class HelloMultipleResultFunction extends FunctionAdapter {
    @Override
    public void execute(FunctionContext fc) {
        ResultSender<String> resultSender = fc.getResultSender();
        resultSender.sendResult("Apache Geode");
        resultSender.sendResult("is");
        resultSender.lastResult("In Memory Data Grid.");
    }

    @Override
    public String getId() {
        return getClass().getName();
    }
}

ここでは、メッセージを3回に分けて返しています。なお、最後にlastResultを呼び出すことは必須で、sendResultだけしてlastResultを呼び出さなかった場合はエラーになります。

確認。

    @Test
    public void functionServiceCallMultipleResult() {
        try (Cache cache = new CacheFactory().create()) {
            Region<String, String> region =
                    cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion");

            Function function = new HelloMultipleResultFunction();
            FunctionService.registerFunction(function);

            Execution execution =
                    FunctionService
                            .onRegion(region);

            ResultCollector rc = execution.execute(function);
            List<String> results = (List<String>) rc.getResult();

            assertThat(results)
                    .hasSize(3)
                    .containsOnly("Apache Geode", "is", "In Memory Data Grid.");

            FunctionService.unregisterFunction(function.getId());
        }
    }

結果が要素3つのListとして得られました。

クラスタ上で実行する

これまでは、Single NodeでFunctionを実行していましたが、今度はクラスタ上で実行したいと思います。

テストコードで書きながら試しているので、できればひとつのJavaVMで複数Nodeを起動したいところです。

ですが、どうもそれはできない様子。

java - Multiple cache creations in Gemfire - Stack Overflow

確かに、Singletonっぽいですね。

https://github.com/apache/incubator-geode/blob/develop/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java#L706-L709

このため、GeodeのNodeを複数起動するには、複数JavaVMで実行する必要があります。

その用途で作った簡易サーバーが、こちら。
src/main/java/org/littlewings/geode/function/SimpleCacheServer.java

package org.littlewings.geode.function;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import com.gemstone.gemfire.distributed.ServerLauncher;
import org.zeroturnaround.exec.ProcessExecutor;
import org.zeroturnaround.exec.StartedProcess;
import org.zeroturnaround.exec.stream.slf4j.Slf4jStream;

public class SimpleCacheServer {
    protected ProcessExecutor executor;
    protected StartedProcess process;

    protected SimpleCacheServer(ProcessExecutor executor) {
        this.executor = executor;
    }

    public static void main(String... args) throws IOException {
        String workDir = "./target/" + System.getProperty("gemfire.name");
        Files.createDirectories(Paths.get(workDir));

        ServerLauncher serverLauncher =
                new ServerLauncher.Builder()
                        .setWorkingDirectory(workDir)
                        .build();
        serverLauncher.start();
    }

    protected void start() {
        try {
            process = executor.start();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void stop() {
        process.getProcess().destroy();
    }

    public static SimpleCacheServer run(int port, Properties properties) {
        List<String> commands = new ArrayList<>();
        commands.add("mvn");
        commands.add("exec:java");
        commands.add("-Dexec.mainClass=" + SimpleCacheServer.class.getName());
        commands.add("-Dexec.args=" + port);
        properties.entrySet().forEach(kv -> commands.add("-D" + kv.getKey() + "=" + kv.getValue()));

        SimpleCacheServer server =
                new SimpleCacheServer(new ProcessExecutor()
                        .redirectOutput(Slf4jStream.of(SimpleCacheServer.class).asInfo())
                        .command(commands));
        server.start();
        return server;
    }
}

「mvn exec:java」で、強引にServerを起動します。設定についてはPropertiesで指定して、中身はシステムプロパティとしてmvn exec:javaの引数とします。

テストコード側には、このサーバーを起動するためのメソッドも実装。

    protected <K, V> void withRegion(int numInstances, String cacheXmlFile, String regionName, BiConsumer<Cache, Region<K, V>> consumer) {
        List<SimpleCacheServer> servers = new ArrayList<>();

        String locator = "localhost[10334]";
        try (Cache cache =
                     new CacheFactory()
                             .set("name", "main-cache")
                             .set("cache-xml-file", cacheXmlFile)
                             .set("start-locator", locator)
                             .create()) {
            Region<K, V> region = cache.<K, V>getRegion(regionName);

            IntStream
                    .rangeClosed(1, numInstances - 1)
                    .forEach(i -> {
                        Properties properties = new Properties();
                        properties.setProperty("gemfire.name", "server" + i);
                        properties.setProperty("gemfire.cache-xml-file", cacheXmlFile);
                        properties.setProperty("gemfire.locators", locator);
                        properties.setProperty("gemfire.start-locator", "localhost[" + (10334 + i) + "]");
                        servers.add(SimpleCacheServer.run((40404 + i), properties));
                    });

            TimeUnit.SECONDS.sleep(20L);

            assertThat(cache.getMembers().size() + 1)
                    .isEqualTo(numInstances);

            consumer.accept(cache, region);
        } catch (InterruptedException e) {
            fail(e.getMessage());
        } finally {
            servers.forEach(SimpleCacheServer::stop);
        }
    }

他のNodeがちゃんと起動しているかどうかのマジメなエラートラップはしていないので、クラスタがうまく構成できているか、わずかばかりのチェックを入れています。

            assertThat(cache.getMembers().size() + 1)
                    .isEqualTo(numInstances);

テストコードは、こちら。Functionは、最初のものに戻しています。

    @Test
    public void functionServiceWithMultipleCache() {
        this.<String, String>withRegion(3, "cache-simple.xml", "sampleRegion", (cache, region) -> {
            Function function = new HelloWorldFunction();
            FunctionService.registerFunction(function);

            Set<DistributedMember> members = new HashSet<>();
            members.add(cache.getDistributedSystem().getDistributedMember());  // self
            members.addAll(cache.getMembers());  // others

            Execution execution = FunctionService.onMembers(members);

            ResultCollector rc = execution.execute(function);
            List<String> results = (List<String>) rc.getResult();

            assertThat(results)
                    .hasSize(3)
                    .containsOnly("Hello My Function!", "Hello My Function!", "Hello My Function!");

            FunctionService.unregisterFunction(function.getId());
        });
    }

使用しているCache XMLは、このような定義になります。
src/main/resources/cache-simple.xml

<?xml version="1.0" encoding="UTF-8"?>
<cache
        xmlns="http://schema.pivotal.io/gemfire/cache"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://schema.pivotal.io/gemfire/cache http://schema.pivotal.io/gemfire/cache/cache-8.1.xsd"
        version="8.1">
    <region name="sampleRegion" refid="PARTITION"/>
</cache>

今回のテストコードでは、自分自身と他のNodeを指定して、FunctionService#onMembersでExecutionを取得します。

            Set<DistributedMember> members = new HashSet<>();
            members.add(cache.getDistributedSystem().getDistributedMember());  // self
            members.addAll(cache.getMembers());  // others

            Execution execution = FunctionService.onMembers(members);

これで、クラスタ内の全Node上で処理を実行したことになります。

もしくは、以下のコードでも全Node上で実行してくれます。とはいえ、onRegionはホントはデータ配置に依存したコードを書くのが合っている気が(このあたりは後述します)。

            Execution execution = FunctionService.onRegion(region);

結果は、全Nodeで実行されたので最初の3倍のデータが返ってきます。といっても、全部同じものですが。

            assertThat(results)
                    .hasSize(3)
                    .containsOnly("Hello My Function!", "Hello My Function!", "Hello My Function!");

今回のコードでは、データにも引数にも依存しないFunctionなので、それぞれのNodeで実行した結果が返ってきたので約3倍になったということですね。

Regionを意識したFunctionを書く

ここまでは、Regionに登録したデータなどは特に関与せず、単にFunctionおよびFunctionに与えるArgumentで完結するサンプルを書いてきました。

今度は、Regionに登録したデータを利用して合算する、簡単なFunctionを書いてみたいと思います。
src/main/java/org/littlewings/geode/function/SummarizeRegionFunction.java

package org.littlewings.geode.function;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import com.gemstone.gemfire.cache.execute.FunctionAdapter;
import com.gemstone.gemfire.cache.execute.FunctionContext;
import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;

public class SummarizeRegionFunction extends FunctionAdapter {
    @Override
    public void execute(FunctionContext fc) {
        RegionFunctionContext context = (RegionFunctionContext) fc;

        Set<?> keys = context.getFilter();

        int sum =
                keys
                        .stream()
                        .reduce(0,
                                (acc, key) -> (acc + ((Integer) PartitionRegionHelper.getLocalDataForContext(context).get(key))),
                                (acc1, acc2) -> acc1 + acc2);

        context.getResultSender().lastResult(sum);
    }

    @Override
    public String getId() {
        return getClass().getName();
    }
}

先ほどまでのサンプルでも呼び出し時に使っていましたが、FunctionService#onRegionを経由すると、FuntionContextをRegionFunctionContextにキャストできるようになります。すると、RegionFunctionContextから呼び出し元から渡された、キー集合を得ることができるようになります。

        RegionFunctionContext context = (RegionFunctionContext) fc;

        Set<?> keys = context.getFilter();

このキー集合を元に、Regionからデータを取得して合算して結果としました。

        int sum =
                keys
                        .stream()
                        .reduce(0,
                                (acc, key) -> (acc + ((Integer) PartitionRegionHelper.getLocalDataForContext(context).get(key))),
                                (acc1, acc2) -> acc1 + acc2);

        context.getResultSender().lastResult(sum);

PartitionRegionHelperを使用すると、Node内のローカルなDataSet(Region)を得ることが可能なようです。

なお、今回は使いませんでしたが、Region自体を得ることも可能です。RegionFunctionContext#getDataSetを使用します。

        Region<?, ?> region = context.getDataSet();

テストコード。

    @Test
    public void regionFunction() {
        try (Cache cache = new CacheFactory().create()) {
            Region<String, Integer> region =
                    cache.<String, Integer>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion");

            IntStream
                    .rangeClosed(1, 100)
                    .forEach(i -> region.put("key" + i, i));

            Function function = new SummarizeRegionFunction();
            FunctionService.registerFunction(function);

            Set<String> keys = region.keySet();

            Execution execution =
                    FunctionService
                            .onRegion(region)
                            .withFilter(keys);

            ResultCollector rc = execution.execute(function);
            List<Integer> results = (List<Integer>) rc.getResult();

            assertThat(results)
                    .hasSize(1)
                    .containsOnly(5050);

            FunctionService.unregisterFunction(function.getId());
        }
    }

先ほどまでと異なる点は、キー集合をExecution#withFilterで設定しているところですね。これが、RegionFunctionContext#getFilterで取得できるキー集合になります。

            Set<String> keys = region.keySet();

            Execution execution =
                    FunctionService
                            .onRegion(region)
                            .withFilter(keys);

クラスタ上で実行し、各NodeのローカルなRegionに合わせた集計処理を行う

最後は、Region上のデータを合算する先ほどのFunctionを少し変更して、クラスタ上で実行してみたいと思います。

変更後、というか先ほどのFunctionをベースに作った新しいFunctionは、こちら。

src/main/java/org/littlewings/geode/function/SummarizeAndAssignedSizeRegionFunction.java
>|java|
package org.littlewings.geode.function;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import com.gemstone.gemfire.cache.execute.FunctionAdapter;
import com.gemstone.gemfire.cache.execute.FunctionContext;
import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;

public class SummarizeAndAssignedSizeRegionFunction extends FunctionAdapter {
    @Override
    public void execute(FunctionContext fc) {
        RegionFunctionContext context = (RegionFunctionContext) fc;

        Set<?> keys = context.getFilter();

        int sum =
                keys
                        .stream()
                        .reduce(0,
                                (acc, key) -> (acc + ((Integer) PartitionRegionHelper.getLocalDataForContext(context).get(key))),
                                (acc1, acc2) -> acc1 + acc2);

        Map<String, Integer> result = new HashMap<>();
        result.put("assignedKeySize", keys.size());
        result.put("sum", sum);

        context.getResultSender().lastResult(result);
    }

    @Override
    public String getId() {
        return getClass().getName();
    }
}

やっていることはだいたい一緒なのですが、今回はFunctionの戻り値を変更して、割り当てられたキーの数も返すようにしています。

        Map<String, Integer> result = new HashMap<>();
        result.put("assignedKeySize", keys.size());
        result.put("sum", sum);

        context.getResultSender().lastResult(result);

ここでの確認ポイントは、割り当てられたキー集合が、各NodeのローカルなRegionに合わせた形(いわゆる、データローカリティ的な)になっていることを確認します。

なお、RegionFunctionContext#getFilterの説明はこのような形になっています。

Returns subset of keys (filter) provided by the invoking thread (aka routing objects). The set of filter keys are locally present in the datastore on the executing cluster member.

テストコードは、このように。

    @Test
    public void regionFunctionWithMultipleCache() {
        this.<String, Integer>withRegion(3, "cache-simple.xml", "sampleRegion", (cache, region) -> {
            IntStream
                    .rangeClosed(1, 100)
                    .forEach(i -> region.put("key" + i, i));

            Function function = new SummarizeAndAssignedSizeRegionFunction();
            FunctionService.registerFunction(function);

            Set<String> keys = region.keySet();

            Execution execution =
                    FunctionService
                            .onRegion(region)
                            .withFilter(keys);

            ResultCollector rc = execution.execute(function);
            List<Map<String, Integer>> results = (List<Map<String, Integer>>) rc.getResult();

            assertThat(results)
                    .hasSize(3);

            IntStream
                    .range(0, results.size())
                    .forEach(i -> {
                        assertThat(results.get(i).get("assignedKeySize"))
                                .isGreaterThan(30)
                                .isLessThan(80);
                        assertThat(results.get(i).get("sum"))
                                .isGreaterThan(1000)
                                .isLessThan(5000);
                    });

            int regionTotalAssignedKeySize = results.stream().mapToInt(map -> map.get("assignedKeySize")).sum();
            assertThat(regionTotalAssignedKeySize)
                    .isEqualTo(100);

            int regionSum = results.stream().mapToInt(map -> map.get("sum")).sum();
            assertThat(regionSum)
                    .isEqualTo(5050);

            FunctionService.unregisterFunction(function.getId());
        });
    }

ここでのCache XMLも、先ほどのクラスタでの実行例と同様、こちらです。
src/main/resources/cache-simple.xml

<?xml version="1.0" encoding="UTF-8"?>
<cache
        xmlns="http://schema.pivotal.io/gemfire/cache"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://schema.pivotal.io/gemfire/cache http://schema.pivotal.io/gemfire/cache/cache-8.1.xsd"
        version="8.1">
    <region name="sampleRegion" refid="PARTITION"/>
</cache>

まずは各Nodeで実行されたFunctionが返した値ですが、割り当てられたキーの数や、合算した値が個々で見ると登録した数・合計値を下回っているものの、何かしら割り当てられていることは確認できます。

            IntStream
                    .range(0, results.size())
                    .forEach(i -> {
                        assertThat(results.get(i).get("assignedKeySize"))
                                .isGreaterThan(30)
                                .isLessThan(80);
                        assertThat(results.get(i).get("sum"))
                                .isGreaterThan(1000)
                                .isLessThan(5000);
                    });

で、全Nodeの結果を合わせると、トータルになると。

            int regionTotalAssignedKeySize = results.stream().mapToInt(map -> map.get("assignedKeySize")).sum();
            assertThat(regionTotalAssignedKeySize)
                    .isEqualTo(100);

            int regionSum = results.stream().mapToInt(map -> map.get("sum")).sum();
            assertThat(regionSum)
                    .isEqualTo(5050);

正確には、Nodeとキーの配置状況と、Functionが実行されたNodeと割り当てられたキーの関係を見ないといけないのですが、今回はそれっぽいのでいいということに…。

まとめ

Apache GeodeでのFunctionを使って、単に固定値を戻すFunction、引数のあるFunction、Regionに登録されたデータを利用するFunction、クラスタ上での実行と試してみました。

カスタムなResultCollectorの作成や、メンバーグループの指定など試していない機能もありますが、まずは使えたということで。