Apache Geodeには、クライアント、もしくはサーバ側からFunctionを実行する機能があります。
http://geode.docs.pivotal.io/docs/developing/function_exec/chapter_overview.html
こちらのページやその他のドキュメントを見ると、以下のような特徴を持つようです。
- 呼び出すがFunctionが、特定のサーバー、メンバー、またはグループで動作するようにさせることができる
- Functionはサーバー上で実行され、呼び出し元に戻ってくる
- Functionは、各サーバーで1度初期化する必要がある。また、Functionは再度利用される可能性がある
- サーバーのデータ配置状況を考慮した実行ができるため、任意の集計演算を効率的に行うことができる
- データに依存した処理も、しない処理も記述可能
Apache Geode上で、Functionがどのような形態で実行されるかは、以下のドキュメントに記載があります。
http://geode.docs.pivotal.io/docs/developing/function_exec/how_function_execution_works.html
- 特定のメンバー、複数のメンバーを指定して実行(P2P)
- 特定のサーバー、複数のサーバーを指定して実行(Client/Server)
- メンバーグループを指定して実行
- データセット(Region)を指定して実行
これらの構成の図が示してあったり、HA(障害時にリトライする)についても記載があります。図については、わかりやすく書かれているので、見るとイメージしやすいかと思います。
それでは、今回はこのFunctionを使って遊んでみるとします。
利用するApache Geodeのバージョンは、1.0.0-incubating.M1とします。構成はPeer-to-Peer構成とし、Client/Server構成については今回は扱いません。
準備
まずは、Maven依存関係から。
<dependency> <groupId>org.apache.geode</groupId> <artifactId>gemfire-core</artifactId> <version>1.0.0-incubating.M1</version> </dependency> <dependency> <groupId>org.zeroturnaround</groupId> <artifactId>zt-exec</artifactId> <version>1.8</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.3.0</version> <scope>test</scope> </dependency>
Apache Geodeとテスト関係の依存関係を加えていますが、ちょっと理由があってZT Process Executorも使うことにします。
テストコードの雛形
Functionの定義といきたいところですが、最初にテストコードの雛形を記述します。
src/test/java/org/littlewings/geode/function/FunctionServiceTest.java
package org.littlewings.geode.function; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.stream.IntStream; import com.gemstone.gemfire.cache.Cache; import com.gemstone.gemfire.cache.CacheFactory; import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.RegionShortcut; import com.gemstone.gemfire.cache.execute.Execution; import com.gemstone.gemfire.cache.execute.Function; import com.gemstone.gemfire.cache.execute.FunctionService; import com.gemstone.gemfire.cache.execute.ResultCollector; import com.gemstone.gemfire.distributed.DistributedMember; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.fail; public class FunctionServiceTest { // ここに、テストを書く! }
利用するimport文は、こちらを参照してください。
はじめてのFunction定義
それでは、こちらのドキュメントを見ながらFunctionを書いていきたいと思います。
http://geode.docs.pivotal.io/docs/developing/function_exec/function_execution.html
最初は、「Hello World」的なものを書いてみました。
src/main/java/org/littlewings/geode/function/HelloWorldFunction.java
package org.littlewings.geode.function; import com.gemstone.gemfire.cache.execute.FunctionAdapter; import com.gemstone.gemfire.cache.execute.FunctionContext; public class HelloWorldFunction extends FunctionAdapter { @Override public void execute(FunctionContext fc) { fc.getResultSender().lastResult("Hello My Function!"); } @Override public String getId() { return getClass().getName(); } }
Functionインターフェースを実装したクラスを作成するのですが、今回はFunctionインターフェースのいくつかのメソッドを実装済みのFunctionAdapterクラスを継承することにしました。
FunctionAdapterを継承すると、以下のメソッドのデフォルト実装が決定します。
- Function#hasResultがtrueを返す(Function実行時に、Functionが結果を送信元にも戻す場合はtrue
- Function#optimizeForWriteがfalseを返す(trueにすると、FunctionService#onRegionでの書き込みの最適化)
- Funciton#isHAがtrueを返す(障害時にリランする)
あとは、executeとgetIdを実装すればOKです。
getIdにはFunctionのIDを設定しますが、今回はドキュメントのサンプル実装と同じくクラス名としました。
executeには、実装したい処理そのものを書きます。
@Override public void execute(FunctionContext fc) { fc.getResultSender().lastResult("Hello My Function!"); }
今回は、「Hello My Function!」と返すFunctionとしました。
では、テストを書いて動かしてみます。Nodeはひとつで実行です。
@Test public void functionServiceCall() { try (Cache cache = new CacheFactory().create()) { Region<String, String> region = cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion"); Function function = new HelloWorldFunction(); FunctionService.registerFunction(function); Execution execution = FunctionService .onRegion(region); ResultCollector rc = execution.execute(function); List<String> results = (List<String>) rc.getResult(); assertThat(results) .hasSize(1) .containsOnly("Hello My Function!"); FunctionService.unregisterFunction(function.getId()); } }
Functionは、newしてFunctionServiceに登録します。
Function function = new HelloWorldFunction();
FunctionService.registerFunction(function);
次に、FunctionServiceからExecutionを取得します。今回はFunctionService#onRegionを使用していますが、他にonMemberやonServer(Client/Server)などもあります。
Execution execution = FunctionService .onRegion(region);
Execution#executeで、Functionを呼び出します。結果はResultCollectorという型で返されるので、ここから適切な型にキャストします。ResultCollectorのデフォルト実装では、ArrayListを戻す(結果をすべてメモリに持つ)ようです。
ResultCollector rc = execution.execute(function); List<String> results = (List<String>) rc.getResult(); assertThat(results) .hasSize(1) .containsOnly("Hello My Function!");
FunctionServiceからのFunctionの登録解除は、FunctionService#unregisterFunctionで行います。
FunctionService.unregisterFunction(function.getId());
この時は、FunctionのIDで解除する必要があります。
とりあえず、動かすことはできましたね。では、もっとサンプルを書いていきましょう。
Execution#execute時に、ID指定で呼び出す
先ほどのコード例では、Execution#executeを呼び出す際にFunctionのインスタンスを指定していましたが、IDを指定しての呼び出しも可能です。
@Test public void functionServiceCallById() { try (Cache cache = new CacheFactory().create()) { Region<String, String> region = cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion"); FunctionService.registerFunction(new HelloWorldFunction()); String id = HelloWorldFunction.class.getName(); Execution execution = FunctionService .onRegion(region); ResultCollector rc = execution.execute(id); List<String> results = (List<String>) rc.getResult(); assertThat(results) .hasSize(1) .containsOnly("Hello My Function!"); FunctionService.unregisterFunction(id); } }
その他は先ほどと同じなので、割愛。
Cache XMLに登録済みのFunctionを呼び出す
Cache XMLで登録済みのFunctionを呼び出すこともできます。関数登録のコードは不要になり、ID指定でExecution#executeすればOKです。
@Test public void predefinedFunctionServiceCallById() { try (Cache cache = new CacheFactory().set("cache-xml-file", "cache-simple-predefined.xml").create()) { Region<String, String> region = cache.<String, String>getRegion("sampleRegion"); String id = HelloWorldDeclarableFunction.class.getName(); Execution execution = FunctionService .onRegion(region); ResultCollector rc = execution.execute(id); List<String> results = (List<String>) rc.getResult(); assertThat(results) .hasSize(1) .containsOnly("Hello My Declarable Function!"); FunctionService.unregisterFunction(id); } }
使用したCache XMLは、こちら。
rc/main/resources/cache-simple-predefined.xml
<?xml version="1.0" encoding="UTF-8"?> <cache xmlns="http://schema.pivotal.io/gemfire/cache" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://schema.pivotal.io/gemfire/cache http://schema.pivotal.io/gemfire/cache/cache-8.1.xsd" version="8.1"> <region name="sampleRegion" refid="PARTITION"/> <function-service> <function> <class-name>org.littlewings.geode.function.HelloWorldDeclarableFunction</class-name> </function> </function-service> </cache>
function-serviceおよびfunction、class-nameタグで、登録したいFunctionのクラス名を指定します。
そして、登録しているクラスはこちら。
src/main/java/org/littlewings/geode/function/HelloWorldDeclarableFunction.java
package org.littlewings.geode.function; import java.util.Properties; import com.gemstone.gemfire.cache.Declarable; import com.gemstone.gemfire.cache.execute.FunctionAdapter; import com.gemstone.gemfire.cache.execute.FunctionContext; public class HelloWorldDeclarableFunction extends FunctionAdapter implements Declarable { @Override public void init(Properties props) { } @Override public void execute(FunctionContext fc) { fc.getResultSender().lastResult("Hello My Declarable Function!"); } @Override public String getId() { return getClass().getName(); } }
先ほどと違うのは、Declarableというインターフェースを実装し、initメソッドを定義していることです。initメソッドでは、今回は使用していませんがCache XML中で設定されたパラメーターがPropertiesとして渡されてくるようです。
動作自体はこれまでの2つとそれほど変わらないので、説明は割愛。
Functionに引数を渡す
Functionへは、呼び出し時に引数を渡すことができます。
Function側にも、引数を取得する実装を書く必要があります。サンプルは、こちら。
src/main/java/org/littlewings/geode/function/HelloArgsFunction.java
package org.littlewings.geode.function; import com.gemstone.gemfire.cache.execute.FunctionAdapter; import com.gemstone.gemfire.cache.execute.FunctionContext; public class HelloArgsFunction extends FunctionAdapter { @Override public void execute(FunctionContext fc) { Object argument = fc.getArguments(); fc.getResultSender().lastResult(String.format("Hello %s!", argument)); } @Override public String getId() { return getClass().getName(); } }
FunctionContext#getArgumentsで、実行時に渡された引数を取得することができます。
呼び出し側は、このようになります。
@Test public void functionServiceCallWithArgument() { try (Cache cache = new CacheFactory().create()) { Region<String, String> region = cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion"); Function function = new HelloArgsFunction(); FunctionService.registerFunction(function); Execution execution = FunctionService .onRegion(region) .withArgs("World"); ResultCollector rc = execution.execute(function); List<String> results = (List<String>) rc.getResult(); assertThat(results) .hasSize(1) .containsOnly("Hello World!"); FunctionService.unregisterFunction(function.getId()); } }
Executionを取得する際に、withArgsで設定すればOKです。
Execution execution =
FunctionService
.onRegion(region)
.withArgs("World");
ResultSenderに複数の結果を与えて返す
ここまで書いてきたFunctionのサンプルはすべてResultSender#lastResultを使って実装していましたが、lastResultはFunctionが返す最後の値を示すものであり、他に返すものがあればResultSender#sendResultを使うと戻り値として追加することができます。
そのサンプル。
src/main/java/org/littlewings/geode/function/HelloMultipleResultFunction.java
package org.littlewings.geode.function; import com.gemstone.gemfire.cache.execute.FunctionAdapter; import com.gemstone.gemfire.cache.execute.FunctionContext; import com.gemstone.gemfire.cache.execute.ResultSender; public class HelloMultipleResultFunction extends FunctionAdapter { @Override public void execute(FunctionContext fc) { ResultSender<String> resultSender = fc.getResultSender(); resultSender.sendResult("Apache Geode"); resultSender.sendResult("is"); resultSender.lastResult("In Memory Data Grid."); } @Override public String getId() { return getClass().getName(); } }
ここでは、メッセージを3回に分けて返しています。なお、最後にlastResultを呼び出すことは必須で、sendResultだけしてlastResultを呼び出さなかった場合はエラーになります。
確認。
@Test public void functionServiceCallMultipleResult() { try (Cache cache = new CacheFactory().create()) { Region<String, String> region = cache.<String, String>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion"); Function function = new HelloMultipleResultFunction(); FunctionService.registerFunction(function); Execution execution = FunctionService .onRegion(region); ResultCollector rc = execution.execute(function); List<String> results = (List<String>) rc.getResult(); assertThat(results) .hasSize(3) .containsOnly("Apache Geode", "is", "In Memory Data Grid."); FunctionService.unregisterFunction(function.getId()); } }
結果が要素3つのListとして得られました。
クラスタ上で実行する
これまでは、Single NodeでFunctionを実行していましたが、今度はクラスタ上で実行したいと思います。
テストコードで書きながら試しているので、できればひとつのJavaVMで複数Nodeを起動したいところです。
ですが、どうもそれはできない様子。
java - Multiple cache creations in Gemfire - Stack Overflow
確かに、Singletonっぽいですね。
このため、GeodeのNodeを複数起動するには、複数JavaVMで実行する必要があります。
その用途で作った簡易サーバーが、こちら。
src/main/java/org/littlewings/geode/function/SimpleCacheServer.java
package org.littlewings.geode.function; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.Properties; import com.gemstone.gemfire.distributed.ServerLauncher; import org.zeroturnaround.exec.ProcessExecutor; import org.zeroturnaround.exec.StartedProcess; import org.zeroturnaround.exec.stream.slf4j.Slf4jStream; public class SimpleCacheServer { protected ProcessExecutor executor; protected StartedProcess process; protected SimpleCacheServer(ProcessExecutor executor) { this.executor = executor; } public static void main(String... args) throws IOException { String workDir = "./target/" + System.getProperty("gemfire.name"); Files.createDirectories(Paths.get(workDir)); ServerLauncher serverLauncher = new ServerLauncher.Builder() .setWorkingDirectory(workDir) .build(); serverLauncher.start(); } protected void start() { try { process = executor.start(); } catch (IOException e) { throw new RuntimeException(e); } } public void stop() { process.getProcess().destroy(); } public static SimpleCacheServer run(int port, Properties properties) { List<String> commands = new ArrayList<>(); commands.add("mvn"); commands.add("exec:java"); commands.add("-Dexec.mainClass=" + SimpleCacheServer.class.getName()); commands.add("-Dexec.args=" + port); properties.entrySet().forEach(kv -> commands.add("-D" + kv.getKey() + "=" + kv.getValue())); SimpleCacheServer server = new SimpleCacheServer(new ProcessExecutor() .redirectOutput(Slf4jStream.of(SimpleCacheServer.class).asInfo()) .command(commands)); server.start(); return server; } }
「mvn exec:java」で、強引にServerを起動します。設定についてはPropertiesで指定して、中身はシステムプロパティとしてmvn exec:javaの引数とします。
テストコード側には、このサーバーを起動するためのメソッドも実装。
protected <K, V> void withRegion(int numInstances, String cacheXmlFile, String regionName, BiConsumer<Cache, Region<K, V>> consumer) { List<SimpleCacheServer> servers = new ArrayList<>(); String locator = "localhost[10334]"; try (Cache cache = new CacheFactory() .set("name", "main-cache") .set("cache-xml-file", cacheXmlFile) .set("start-locator", locator) .create()) { Region<K, V> region = cache.<K, V>getRegion(regionName); IntStream .rangeClosed(1, numInstances - 1) .forEach(i -> { Properties properties = new Properties(); properties.setProperty("gemfire.name", "server" + i); properties.setProperty("gemfire.cache-xml-file", cacheXmlFile); properties.setProperty("gemfire.locators", locator); properties.setProperty("gemfire.start-locator", "localhost[" + (10334 + i) + "]"); servers.add(SimpleCacheServer.run((40404 + i), properties)); }); TimeUnit.SECONDS.sleep(20L); assertThat(cache.getMembers().size() + 1) .isEqualTo(numInstances); consumer.accept(cache, region); } catch (InterruptedException e) { fail(e.getMessage()); } finally { servers.forEach(SimpleCacheServer::stop); } }
他のNodeがちゃんと起動しているかどうかのマジメなエラートラップはしていないので、クラスタがうまく構成できているか、わずかばかりのチェックを入れています。
assertThat(cache.getMembers().size() + 1)
.isEqualTo(numInstances);
テストコードは、こちら。Functionは、最初のものに戻しています。
@Test public void functionServiceWithMultipleCache() { this.<String, String>withRegion(3, "cache-simple.xml", "sampleRegion", (cache, region) -> { Function function = new HelloWorldFunction(); FunctionService.registerFunction(function); Set<DistributedMember> members = new HashSet<>(); members.add(cache.getDistributedSystem().getDistributedMember()); // self members.addAll(cache.getMembers()); // others Execution execution = FunctionService.onMembers(members); ResultCollector rc = execution.execute(function); List<String> results = (List<String>) rc.getResult(); assertThat(results) .hasSize(3) .containsOnly("Hello My Function!", "Hello My Function!", "Hello My Function!"); FunctionService.unregisterFunction(function.getId()); }); }
使用しているCache XMLは、このような定義になります。
src/main/resources/cache-simple.xml
<?xml version="1.0" encoding="UTF-8"?> <cache xmlns="http://schema.pivotal.io/gemfire/cache" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://schema.pivotal.io/gemfire/cache http://schema.pivotal.io/gemfire/cache/cache-8.1.xsd" version="8.1"> <region name="sampleRegion" refid="PARTITION"/> </cache>
今回のテストコードでは、自分自身と他のNodeを指定して、FunctionService#onMembersでExecutionを取得します。
Set<DistributedMember> members = new HashSet<>(); members.add(cache.getDistributedSystem().getDistributedMember()); // self members.addAll(cache.getMembers()); // others Execution execution = FunctionService.onMembers(members);
これで、クラスタ内の全Node上で処理を実行したことになります。
もしくは、以下のコードでも全Node上で実行してくれます。とはいえ、onRegionはホントはデータ配置に依存したコードを書くのが合っている気が(このあたりは後述します)。
Execution execution = FunctionService.onRegion(region);
結果は、全Nodeで実行されたので最初の3倍のデータが返ってきます。といっても、全部同じものですが。
assertThat(results) .hasSize(3) .containsOnly("Hello My Function!", "Hello My Function!", "Hello My Function!");
今回のコードでは、データにも引数にも依存しないFunctionなので、それぞれのNodeで実行した結果が返ってきたので約3倍になったということですね。
Regionを意識したFunctionを書く
ここまでは、Regionに登録したデータなどは特に関与せず、単にFunctionおよびFunctionに与えるArgumentで完結するサンプルを書いてきました。
今度は、Regionに登録したデータを利用して合算する、簡単なFunctionを書いてみたいと思います。
src/main/java/org/littlewings/geode/function/SummarizeRegionFunction.java
package org.littlewings.geode.function; import java.util.HashMap; import java.util.Map; import java.util.Set; import com.gemstone.gemfire.cache.execute.FunctionAdapter; import com.gemstone.gemfire.cache.execute.FunctionContext; import com.gemstone.gemfire.cache.execute.RegionFunctionContext; import com.gemstone.gemfire.cache.partition.PartitionRegionHelper; public class SummarizeRegionFunction extends FunctionAdapter { @Override public void execute(FunctionContext fc) { RegionFunctionContext context = (RegionFunctionContext) fc; Set<?> keys = context.getFilter(); int sum = keys .stream() .reduce(0, (acc, key) -> (acc + ((Integer) PartitionRegionHelper.getLocalDataForContext(context).get(key))), (acc1, acc2) -> acc1 + acc2); context.getResultSender().lastResult(sum); } @Override public String getId() { return getClass().getName(); } }
先ほどまでのサンプルでも呼び出し時に使っていましたが、FunctionService#onRegionを経由すると、FuntionContextをRegionFunctionContextにキャストできるようになります。すると、RegionFunctionContextから呼び出し元から渡された、キー集合を得ることができるようになります。
RegionFunctionContext context = (RegionFunctionContext) fc; Set<?> keys = context.getFilter();
このキー集合を元に、Regionからデータを取得して合算して結果としました。
int sum = keys .stream() .reduce(0, (acc, key) -> (acc + ((Integer) PartitionRegionHelper.getLocalDataForContext(context).get(key))), (acc1, acc2) -> acc1 + acc2); context.getResultSender().lastResult(sum);
PartitionRegionHelperを使用すると、Node内のローカルなDataSet(Region)を得ることが可能なようです。
なお、今回は使いませんでしたが、Region自体を得ることも可能です。RegionFunctionContext#getDataSetを使用します。
Region<?, ?> region = context.getDataSet();
テストコード。
@Test public void regionFunction() { try (Cache cache = new CacheFactory().create()) { Region<String, Integer> region = cache.<String, Integer>createRegionFactory(RegionShortcut.PARTITION).create("sampleRegion"); IntStream .rangeClosed(1, 100) .forEach(i -> region.put("key" + i, i)); Function function = new SummarizeRegionFunction(); FunctionService.registerFunction(function); Set<String> keys = region.keySet(); Execution execution = FunctionService .onRegion(region) .withFilter(keys); ResultCollector rc = execution.execute(function); List<Integer> results = (List<Integer>) rc.getResult(); assertThat(results) .hasSize(1) .containsOnly(5050); FunctionService.unregisterFunction(function.getId()); } }
先ほどまでと異なる点は、キー集合をExecution#withFilterで設定しているところですね。これが、RegionFunctionContext#getFilterで取得できるキー集合になります。
Set<String> keys = region.keySet(); Execution execution = FunctionService .onRegion(region) .withFilter(keys);
クラスタ上で実行し、各NodeのローカルなRegionに合わせた集計処理を行う
最後は、Region上のデータを合算する先ほどのFunctionを少し変更して、クラスタ上で実行してみたいと思います。
変更後、というか先ほどのFunctionをベースに作った新しいFunctionは、こちら。
src/main/java/org/littlewings/geode/function/SummarizeAndAssignedSizeRegionFunction.java >|java| package org.littlewings.geode.function; import java.util.HashMap; import java.util.Map; import java.util.Set; import com.gemstone.gemfire.cache.execute.FunctionAdapter; import com.gemstone.gemfire.cache.execute.FunctionContext; import com.gemstone.gemfire.cache.execute.RegionFunctionContext; import com.gemstone.gemfire.cache.partition.PartitionRegionHelper; public class SummarizeAndAssignedSizeRegionFunction extends FunctionAdapter { @Override public void execute(FunctionContext fc) { RegionFunctionContext context = (RegionFunctionContext) fc; Set<?> keys = context.getFilter(); int sum = keys .stream() .reduce(0, (acc, key) -> (acc + ((Integer) PartitionRegionHelper.getLocalDataForContext(context).get(key))), (acc1, acc2) -> acc1 + acc2); Map<String, Integer> result = new HashMap<>(); result.put("assignedKeySize", keys.size()); result.put("sum", sum); context.getResultSender().lastResult(result); } @Override public String getId() { return getClass().getName(); } }
やっていることはだいたい一緒なのですが、今回はFunctionの戻り値を変更して、割り当てられたキーの数も返すようにしています。
Map<String, Integer> result = new HashMap<>(); result.put("assignedKeySize", keys.size()); result.put("sum", sum); context.getResultSender().lastResult(result);
ここでの確認ポイントは、割り当てられたキー集合が、各NodeのローカルなRegionに合わせた形(いわゆる、データローカリティ的な)になっていることを確認します。
なお、RegionFunctionContext#getFilterの説明はこのような形になっています。
Returns subset of keys (filter) provided by the invoking thread (aka routing objects). The set of filter keys are locally present in the datastore on the executing cluster member.
テストコードは、このように。
@Test public void regionFunctionWithMultipleCache() { this.<String, Integer>withRegion(3, "cache-simple.xml", "sampleRegion", (cache, region) -> { IntStream .rangeClosed(1, 100) .forEach(i -> region.put("key" + i, i)); Function function = new SummarizeAndAssignedSizeRegionFunction(); FunctionService.registerFunction(function); Set<String> keys = region.keySet(); Execution execution = FunctionService .onRegion(region) .withFilter(keys); ResultCollector rc = execution.execute(function); List<Map<String, Integer>> results = (List<Map<String, Integer>>) rc.getResult(); assertThat(results) .hasSize(3); IntStream .range(0, results.size()) .forEach(i -> { assertThat(results.get(i).get("assignedKeySize")) .isGreaterThan(30) .isLessThan(80); assertThat(results.get(i).get("sum")) .isGreaterThan(1000) .isLessThan(5000); }); int regionTotalAssignedKeySize = results.stream().mapToInt(map -> map.get("assignedKeySize")).sum(); assertThat(regionTotalAssignedKeySize) .isEqualTo(100); int regionSum = results.stream().mapToInt(map -> map.get("sum")).sum(); assertThat(regionSum) .isEqualTo(5050); FunctionService.unregisterFunction(function.getId()); }); }
ここでのCache XMLも、先ほどのクラスタでの実行例と同様、こちらです。
src/main/resources/cache-simple.xml
<?xml version="1.0" encoding="UTF-8"?> <cache xmlns="http://schema.pivotal.io/gemfire/cache" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://schema.pivotal.io/gemfire/cache http://schema.pivotal.io/gemfire/cache/cache-8.1.xsd" version="8.1"> <region name="sampleRegion" refid="PARTITION"/> </cache>
まずは各Nodeで実行されたFunctionが返した値ですが、割り当てられたキーの数や、合算した値が個々で見ると登録した数・合計値を下回っているものの、何かしら割り当てられていることは確認できます。
IntStream .range(0, results.size()) .forEach(i -> { assertThat(results.get(i).get("assignedKeySize")) .isGreaterThan(30) .isLessThan(80); assertThat(results.get(i).get("sum")) .isGreaterThan(1000) .isLessThan(5000); });
で、全Nodeの結果を合わせると、トータルになると。
int regionTotalAssignedKeySize = results.stream().mapToInt(map -> map.get("assignedKeySize")).sum(); assertThat(regionTotalAssignedKeySize) .isEqualTo(100); int regionSum = results.stream().mapToInt(map -> map.get("sum")).sum(); assertThat(regionSum) .isEqualTo(5050);
正確には、Nodeとキーの配置状況と、Functionが実行されたNodeと割り当てられたキーの関係を見ないといけないのですが、今回はそれっぽいのでいいということに…。
まとめ
Apache GeodeでのFunctionを使って、単に固定値を戻すFunction、引数のあるFunction、Regionに登録されたデータを利用するFunction、クラスタ上での実行と試してみました。
カスタムなResultCollectorの作成や、メンバーグループの指定など試していない機能もありますが、まずは使えたということで。