Apache Geodeには、Partitioned Regionに格納するデータの配置をカスタマイズする機能があります。
Custom-Partitioning/Colocation Dataとして、ドキュメントに記載があります。
平たく言うと、データ・アフィニティとか、データのグルーピング的な感じで扱われる概念ですね。
Custom-Partitioning/Colocation Data
どんなもの?
Apache Geodeは新規エントリーの格納時に、デフォルトではキーの内容に応じてどのBucketに割り当てるのかを決定します。Custom Partitioningを使うと、この挙動をコントロールできるようになるようです。
この機能を使うと、関連するデータを同じRegionに集めたりすることができるようになり、Function実行時のパフォーマンス改善などが見込めると。
Custom-Partitioningには、2つの実現方法があります。
Standard custom partitioning
標準的な方法で、データのグルーピングについては指定できますが、どのBucketに配置されるかを指定することはできません。Geodeは、いつも指定したデータのグループが同じBucketにあるように維持し続けますが、ロードバランシングによって別のBucketに移動させることがあります。
Fixed custom partitioning
Standard custom partitioningに加えて、データをどのメンバーが保持するか、明確に指定するタイプのPartitioningです。エントリをどのBucketに割り当てるか、PrimaryとSecondaryをどのようにするかを指定することができます。
このため、このPartitioningではデータの配置を完全にコントロールすることができます。データを、特定の物理マシンに配置するようにすることなども可能になります。
ただし、これには次のトレードオフがあります。
- Geodeは固定化されたPartitionをリバランスすることができなくなるため、想定外のデータ量とならないよう注意してデータをロードする必要がある
- 各メンバーで異なるConfigurationとなる。PartitionResolver(後述)はPrimaryのIDを戻すことになり、SecondaryのIDを戻すことはない
後者がちょっとわかりにくいのですが、この機能を使うためにPartitionResolverというものを実装するのですが、これがいつもPrimaryのIDを返すことになり、Secondaryが配置場所の解決の際には出てこないよ、と言っているみたいです。
使ってみる
それでは、こちらを参照しながらCustom-Partitioning/Colocation Dataを試してみるとしましょう。
今回は、Standard custom partitioningを試してみたいと思います。Fixed custom partitioningは、ちょっといいかなぁと…。
Standard custom partitioningを使うには、PartitionResolverインターフェースを実装したクラスを用意し、次のいずれかの方法を取る必要があります。
※ちなみに、Fixed custom partitioningの場合はFixedPartitionResolverインターフェースを実装する必要があるようです
- Region作成時にCustom Classとして登録する
- エントリーのキーとして登録する
- Cache callbackとして登録する
3つ目はなんだろう?と思いましたが、要はListenerですね。Javadocを見ると、確かにCacheCallbackインターフェースを拡張しています。
PartitionResolverでは、getNameおよびgetRoutingObject、そしてcloseメソッドを実装します。
closeは、CacheCallbackから引き継いだものです。エントリーの配置は、getRoutingObjectメソッドでコントロールします。
では、用意をしつつプログラムを書いていきます。
準備
まずは、Maven依存関係から。
<dependency> <groupId>org.apache.geode</groupId> <artifactId>geode-lucene</artifactId> <version>1.0.0-incubating.M2</version> </dependency> <dependency> <groupId>org.zeroturnaround</groupId> <artifactId>zt-exec</artifactId> <version>1.9</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.5.2</version> <scope>test</scope> </dependency>
Apache Geodeがあるのはもちろんですが、テストコード用にJUnitとAssertJ、そしてプログラム内でクラスタを組むためにZT Process Executorを入れています。
gemfire.propertiesは用意しますが、中身は空とします。
src/test/resources/gemfire.properties
cache.xmlも使いますが、中身はまた後で。
src/test/resources/cache.xml
<?xml version="1.0" encoding="UTF-8"?> <cache xmlns="http://geode.apache.org/schema/cache" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd" version="1.0"> 〜後で〜 </cache>
単純なApache Geode Server
今回、エントリーのメンバー間での配置をコントロールするので、複数メンバーでクラスタを構成して確認することになります。
このために、簡単にServerを起動できるプログラムを用意しました。
src/main/java/org/littlewings/geode/colocatedata/SimpleCacheServer.java
package org.littlewings.geode.colocatedata; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.Properties; import com.gemstone.gemfire.distributed.ServerLauncher; import org.zeroturnaround.exec.ProcessExecutor; import org.zeroturnaround.exec.StartedProcess; import org.zeroturnaround.exec.stream.slf4j.Slf4jStream; public class SimpleCacheServer { protected ProcessExecutor executor; protected StartedProcess process; protected SimpleCacheServer(ProcessExecutor executor) { this.executor = executor; } public static void main(String... args) throws IOException { String workDir = "./target/" + System.getProperty("gemfire.name"); Files.createDirectories(Paths.get(workDir)); ServerLauncher serverLauncher = new ServerLauncher.Builder() .setWorkingDirectory(workDir) .setServerPort(Integer.parseInt(args[0])) .build(); serverLauncher.start(); } protected void start() { try { process = executor.start(); } catch (IOException e) { throw new RuntimeException(e); } } public void stop() { process.getProcess().destroy(); } public static SimpleCacheServer run(int port, Properties properties) { List<String> commands = new ArrayList<>(); commands.add("mvn"); commands.add("exec:java"); commands.add("-Dexec.mainClass=" + SimpleCacheServer.class.getName()); commands.add("-Dexec.args=" + port); properties.entrySet().forEach(kv -> commands.add("-D" + kv.getKey() + "=" + kv.getValue())); SimpleCacheServer server = new SimpleCacheServer(new ProcessExecutor() .redirectOutput(Slf4jStream.of(SimpleCacheServer.class).asInfo()) .command(commands)); server.start(); return server; } }
これを、テストコードから呼び出してクラスタを構成します。
テストコードの雛形
テストコードの全体像は、こんな感じです。
src/test/java/org/littlewings/geode/colocatedata/ColocateDateTest.java
package org.littlewings.geode.colocatedata; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.stream.Collectors; import java.util.stream.IntStream; import com.gemstone.gemfire.cache.Cache; import com.gemstone.gemfire.cache.CacheFactory; import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.partition.PartitionRegionHelper; import com.gemstone.gemfire.distributed.DistributedMember; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.fail; public class ColocateDateTest { // ここに、テストを書く! protected <K, V> void withRegion(int numInstances, String cacheXmlFile, String regionName, BiConsumer<Cache, Region<K, V>> consumer) { int serverPort = 40404; int locatorPort = 10334; String locators = "localhost[" + locatorPort + "]"; List<SimpleCacheServer> servers = new ArrayList<>(); try (Cache cache = new CacheFactory() .set("name", "main-cache") .set("cache-xml-file", cacheXmlFile) .set("start-locator", locators) .create()) { Region<K, V> region = cache.<K, V>getRegion(regionName); IntStream .rangeClosed(1, numInstances - 1) .forEach(i -> { Properties properties = new Properties(); properties.setProperty("gemfire.name", "server" + i); properties.setProperty("gemfire.cache-xml-file", cacheXmlFile); properties.setProperty("gemfire.locators", "localhost[10334]"); properties.setProperty("gemfire.start-locator", "localhost[" + (locatorPort + i) + "]"); servers.add(SimpleCacheServer.run((serverPort + i), properties)); }); TimeUnit.SECONDS.sleep(15L); assertThat(cache.getMembers().size() + 1) .isEqualTo(numInstances); consumer.accept(cache, region); } catch (InterruptedException e) { fail(e.getMessage()); } finally { servers.forEach(SimpleCacheServer::stop); } } }
先ほど用意したServer用のプログラムを使用して、簡単にですがクラスタを構成します。
では、PartitionResolverを実装していきます。
お題
今回、Regionとしては意味的には
Region<String, String>
で考えます。
この時、キーを
"key-A-x" "key-B-x"
のように用意して、「key-A」のグループと「key-B」のグループに分けてみたいと思います。
そもそも、なにもしないとどうなるか
PartitionResolverを特に設定しないデフォルトの状態で、テストコードを書いてみます。クラスタに参加するメンバーは、3とします。
@Test public void simpleUsage() { this.<String, String>withRegion(3, "src/test/resources/cache.xml", "simpleRegion", (cache, region) -> { IntStream.rangeClosed(1, 5).forEach(i -> region.put("key-A-" + i, "value" + i)); IntStream.rangeClosed(6, 10).forEach(i -> region.put("key-B-" + i, "value" + i)); Set<DistributedMember> keyAMembers = IntStream .rangeClosed(1, 5) .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, "key-A-" + i)) .collect(Collectors.toSet()); Set<DistributedMember> keyBMembers = IntStream .rangeClosed(6, 10) .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, "key-B-" + i)) .collect(Collectors.toSet()); assertThat(keyAMembers.size()) .isGreaterThan(1); assertThat(keyBMembers.size()) .isGreaterThan(1); }); }
この時のRegionの設定は、こちら。
<region name="simpleRegion" refid="PARTITION_REDUNDANT"/>
結果、特にグルーピングはされず、「key-A〜」、「key-B〜」で格納されたエントリーが、複数のメンバーに割り当てられます。バラけている状態ですね。
エントリーのキーとしてPartitionResolverを使う
では、PartitionResolverを使う最初のサンプルとして、エントリーのキーをPartitionResolverを使う例を挙げたいと思います。
このケースでは、PartitionResolverがRegionにputするキーとなります。Regionで扱うため、Serializableである必要があります。
使いどころとしては、エントリーのキーとするクラスを自分でコントロールできる場合でしょうか。
src/main/java/org/littlewings/geode/colocatedata/GroupingKey.java
package org.littlewings.geode.colocatedata; import java.io.Serializable; import com.gemstone.gemfire.cache.EntryOperation; import com.gemstone.gemfire.cache.PartitionResolver; public class GroupingKey implements PartitionResolver, Serializable { private String prefix; private int number; public GroupingKey(String prefix, int number) { this.prefix = prefix; this.number = number; } @Override public Object getRoutingObject(EntryOperation opDetails) { return prefix; } @Override public String getName() { return getClass().getSimpleName(); } @Override public void close() { // no-op } }
今回は、グルーピングする単位をprefixとして受け取り
public GroupingKey(String prefix, int number) { this.prefix = prefix; this.number = number; }
prefixをgetRoutingObjectメソッドで返却する実装としました。
@Override public Object getRoutingObject(EntryOperation opDetails) { return prefix; }
確認してみましょう。
@Test public void usingPartitionResolverKey() { this.<GroupingKey, String>withRegion(3, "src/test/resources/cache.xml", "groupingKeyRegion", (cache, region) -> { IntStream.rangeClosed(1, 5).forEach(i -> region.put(new GroupingKey("key-A", i), "value" + i)); IntStream.rangeClosed(6, 10).forEach(i -> region.put(new GroupingKey("key-B", i), "value" + i)); Set<DistributedMember> keyAMembers = IntStream .rangeClosed(1, 5) .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, new GroupingKey("key-A", i))) .collect(Collectors.toSet()); Set<DistributedMember> keyBMembers = IntStream .rangeClosed(6, 10) .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, new GroupingKey("key-B", i))) .collect(Collectors.toSet()); assertThat(keyAMembers.size()) .isEqualTo(1); assertThat(keyBMembers.size()) .isEqualTo(1); }); }
この時のRegionの設定は、こちら。
<region name="groupingKeyRegion" refid="PARTITION_REDUNDANT"/>
先ほど作成したGroupingKeyを、Region#putする際のキーとして使用します。
IntStream.rangeClosed(1, 5).forEach(i -> region.put(new GroupingKey("key-A", i), "value" + i)); IntStream.rangeClosed(6, 10).forEach(i -> region.put(new GroupingKey("key-B", i), "value" + i));
各キーに対応するメンバーを保持する、Primaryなメンバーを集めます。
Set<DistributedMember> keyAMembers = IntStream .rangeClosed(1, 5) .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, new GroupingKey("key-A", i))) .collect(Collectors.toSet()); Set<DistributedMember> keyBMembers = IntStream .rangeClosed(6, 10) .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, new GroupingKey("key-B", i))) .collect(Collectors.toSet());
すると、それぞれ見事にユニークになりました。
assertThat(keyAMembers.size()) .isEqualTo(1); assertThat(keyBMembers.size()) .isEqualTo(1);
グルーピングして配置できた感じですね。
PartitionResolverを、Custom Classとして登録する
続いて、PartitionResolverを、Custom Classとして登録するパターンを扱います。このパターンは、キーに使うクラス自体をコントロールできない場合などに使うのでしょう。
この例で用意したPartitionResolverの実装は、こちらです。
src/main/java/org/littlewings/geode/colocatedata/GroupingKeyResolver.java
package org.littlewings.geode.colocatedata; import java.util.Properties; import com.gemstone.gemfire.cache.Declarable; import com.gemstone.gemfire.cache.EntryOperation; import com.gemstone.gemfire.cache.PartitionResolver; public class GroupingKeyResolver implements PartitionResolver, Declarable { @Override public void init(Properties props) { // no-op } @Override public Object getRoutingObject(EntryOperation opDetails) { String key = (String) opDetails.getKey(); System.out.println("Grouping Key = " + key.substring(0, key.indexOf('-', 5))); return key.substring(0, key.indexOf('-', 5)); // "key-N" } @Override public String getName() { return getClass().getSimpleName(); } @Override public void close() { // no-op } }
設定ファイルで登録するつもりで書いたので、Declarableインターフェースを実装しています。
getRoutingObjectメソッドの実装方法ですが、引数のEntryOperationからキーが取得できるので、ここではStringのキーを「key-A」や「key-B」の形に切り取って返すことにします。
@Override public Object getRoutingObject(EntryOperation opDetails) { String key = (String) opDetails.getKey(); System.out.println("Grouping Key = " + key.substring(0, key.indexOf('-', 5))); return key.substring(0, key.indexOf('-', 5)); // "key-N" }
このPartitionResolverを使ったテストコードは、こちら。
@Test public void withPartitionResolver() { this.<String, String>withRegion(3, "src/test/resources/cache.xml", "withResolverRegion", (cache, region) -> { IntStream.rangeClosed(1, 5).forEach(i -> region.put("key-A-" + i, "value" + i)); IntStream.rangeClosed(6, 10).forEach(i -> region.put("key-B-" + i, "value" + i)); Set<DistributedMember> keyAMembers = IntStream .rangeClosed(1, 5) .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, "key-A-" + i)) .collect(Collectors.toSet()); Set<DistributedMember> keyBMembers = IntStream .rangeClosed(6, 10) .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, "key-B-" + i)) .collect(Collectors.toSet()); assertThat(keyAMembers.size()) .isEqualTo(1); assertThat(keyBMembers.size()) .isEqualTo(1); }); }
パッと見はなんの変哲もない感じですが、ちゃんとグルーピングされています。
assertThat(keyAMembers.size()) .isEqualTo(1); assertThat(keyBMembers.size()) .isEqualTo(1);
cache.xmlではどうなっているかというと、attributeとしてpartition-resolverで設定しています。
<region name="withResolverRegion" refid="PARTITION_REDUNDANT"> <region-attributes> <partition-attributes> <partition-resolver> <class-name>org.littlewings.geode.colocatedata.GroupingKeyResolver</class-name> </partition-resolver> </partition-attributes> </region-attributes> </region>
こちらも確認できましたね。
Cache callbackとして使う
くどくなりそうなので、今回は省略…。
異なるRegionのデータの配置をコントロールする場合
最後は、複数の異なるRegionのデータの配置をコントロールする場合を試してみます。
デフォルトでは、Partitioned Regionで配置されるデータの場所は、他のRegionに依存しないそうですが、この挙動を変更できるようです。これを使うと、複数のRegionで関連するデータをまとめることができ、クエリやFunctionの実行の高速化などが期待できるようになります。
関連するRegionのPartitionResolverは、同じメカニズムでrouting objectを決定する必要があるようです(そりゃあそうだという気はしますが)。
まずは、2つの関連しないRegionについて、同じPartitionResolverを使って確認してみましょう。cache.xmlでは、このような定義とします。同じPartitionResolverを、Custom Classとして設定しています。
<region name="withResolverRegion" refid="PARTITION_REDUNDANT"> <region-attributes> <partition-attributes> <partition-resolver> <class-name>org.littlewings.geode.colocatedata.GroupingKeyResolver</class-name> </partition-resolver> </partition-attributes> </region-attributes> </region> <region name="withResolverRegion-NonRelated" refid="PARTITION_REDUNDANT"> <region-attributes> <partition-attributes> <partition-resolver> <class-name>org.littlewings.geode.colocatedata.GroupingKeyResolver</class-name> </partition-resolver> </partition-attributes> </region-attributes> </region>
テストコード。簡易的に、「key-A〜」、「key-B〜」のキーを格納するRegionを用意し、もうひとつのRegionには「key-A-X〜」、「key-B-Y〜」という形式でキーを格納するようにしました。prefix的に、「key-A」および「key-B」で始まるデータはまとめたいところです。
@Test public void colocateNonRelatedRegion() { this.<String, String>withRegion(3, "src/test/resources/cache.xml", "withResolverRegion", (cache, region) -> { Region<String, String> nonRelatedRegion = cache.getRegion("withResolverRegion-NonRelated"); IntStream.rangeClosed(1, 5).forEach(i -> region.put("key-A-" + i, "value" + i)); IntStream.rangeClosed(6, 10).forEach(i -> region.put("key-B-" + i, "value" + i)); IntStream.rangeClosed(50, 55).forEach(i -> nonRelatedRegion.put("key-A-X-" + i, "value" + i)); IntStream.rangeClosed(55, 60).forEach(i -> nonRelatedRegion.put("key-B-Y-" + i, "value" + i)); Set<DistributedMember> keyAMembers = IntStream .rangeClosed(1, 5) .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, "key-A-" + i)) .collect(Collectors.toSet()); Set<DistributedMember> keyBMembers = IntStream .rangeClosed(6, 10) .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, "key-B-" + i)) .collect(Collectors.toSet()); Set<DistributedMember> keyAXMembers = IntStream .rangeClosed(1, 5) .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(nonRelatedRegion, "key-A-X-" + i)) .collect(Collectors.toSet()); Set<DistributedMember> keyBYMembers = IntStream .rangeClosed(6, 10) .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(nonRelatedRegion, "key-B-Y-" + i)) .collect(Collectors.toSet()); assertThat(keyAMembers.size()) .isEqualTo(1); assertThat(keyBMembers.size()) .isEqualTo(1); assertThat(keyAXMembers.size()) .isEqualTo(1); assertThat(keyBYMembers.size()) .isEqualTo(1); assertThat(keyAMembers) .isNotEqualTo(keyAXMembers); assertThat(keyBMembers) .isNotEqualTo(keyBYMembers); }); }
それぞれグルーピングはされていますが、関連するデータは同じメンバーには集まっていません。
assertThat(keyAMembers) .isNotEqualTo(keyAXMembers); assertThat(keyBMembers) .isNotEqualTo(keyBYMembers);
これをまとめるようにしてみましょう。
設定自体は簡単で、cache.xmlで以下のように定義します。やっぱり、同じPartitionResolverをCustom Classとして設定します。
<region name="withResolverRegion" refid="PARTITION_REDUNDANT"> <region-attributes> <partition-attributes> <partition-resolver> <class-name>org.littlewings.geode.colocatedata.GroupingKeyResolver</class-name> </partition-resolver> </partition-attributes> </region-attributes> </region> <region name="withResolverRegion-Related" refid="PARTITION_REDUNDANT"> <region-attributes> <partition-attributes colocated-with="withResolverRegion"> <partition-resolver> <class-name>org.littlewings.geode.colocatedata.GroupingKeyResolver</class-name> </partition-resolver> </partition-attributes> </region-attributes> </region>
なにが変わったかというと、後ろにあるPartitioned Regionでは、関連するRegionをcolocated-with属性で指定するようになりました。
<region name="withResolverRegion-Related" refid="PARTITION_REDUNDANT"> <region-attributes> <partition-attributes colocated-with="withResolverRegion">
これで、2つのRegionで関連するデータを、それぞれ同じメンバーに集めることができます。
@Test public void colocateRelatedRegion() { this.<String, String>withRegion(3, "src/test/resources/cache.xml", "withResolverRegion", (cache, region) -> { Region<String, String> relatedRegion = cache.getRegion("withResolverRegion-Related"); IntStream.rangeClosed(1, 5).forEach(i -> region.put("key-A-" + i, "value" + i)); IntStream.rangeClosed(6, 10).forEach(i -> region.put("key-B-" + i, "value" + i)); IntStream.rangeClosed(50, 55).forEach(i -> relatedRegion.put("key-A-X-" + i, "value" + i)); IntStream.rangeClosed(55, 60).forEach(i -> relatedRegion.put("key-B-Y-" + i, "value" + i)); Set<DistributedMember> keyAMembers = IntStream .rangeClosed(1, 5) .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, "key-A-" + i)) .collect(Collectors.toSet()); Set<DistributedMember> keyBMembers = IntStream .rangeClosed(6, 10) .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, "key-B-" + i)) .collect(Collectors.toSet()); Set<DistributedMember> keyAXMembers = IntStream .rangeClosed(1, 5) .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(relatedRegion, "key-A-X-" + i)) .collect(Collectors.toSet()); Set<DistributedMember> keyBYMembers = IntStream .rangeClosed(6, 10) .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(relatedRegion, "key-B-Y-" + i)) .collect(Collectors.toSet()); assertThat(keyAMembers.size()) .isEqualTo(1); assertThat(keyBMembers.size()) .isEqualTo(1); assertThat(keyAXMembers.size()) .isEqualTo(1); assertThat(keyBYMembers.size()) .isEqualTo(1); assertThat(keyAMembers) .isEqualTo(keyAXMembers); assertThat(keyBMembers) .isEqualTo(keyBYMembers); }); }
OKですね。
使うにはもう少し前提があって、
- Regionを定義する時には、先にデータの中心となるRegionを定義すること
- 関連させるPartitioned Regionは、partition attributesで指定する「recovery-delay」、「redundant-copies」、「startup-recovery-delay」、「total-num-buckets」は同じである必要がある
- 関連するRegionで、同じPartitionResolverを使うこと
- Regionのデータをディスクに永続化する場合は、各Regionが同じディスクにデータを保存するようにすること
といった感じみたいです。
まとめ
Apache GeodeのCustom-Partitioning/Colocation Dataを使って、データの配置(グルーピング)をコントロールする方法を見てみました。
このあたりは他のグリッドとそう変わらない概念だったので、あまり困ることはありませんでした。
が、完全に固定化できるとか、複数のRegionでもグルーピングをコントロールできるとは思っていなかったので、ここはちょっと驚きましたね。