CLOVER🍀

That was when it all began.

Apache GeodeのPartitionResolver(Custom-Partitioning/Colocation Data)を試す

Apache Geodeには、Partitioned Regionに格納するデータの配置をカスタマイズする機能があります。

Custom-Partitioning/Colocation Dataとして、ドキュメントに記載があります。

http://geode.docs.pivotal.io/docs/developing/partitioned_regions/overview_custom_partitioning_and_data_colocation.html

http://geode.docs.pivotal.io/docs/developing/partitioned_regions/custom_partitioning_and_data_colocation.html

http://geode.docs.pivotal.io/docs/developing/partitioned_regions/using_custom_partition_resolvers.html

http://geode.docs.pivotal.io/docs/developing/partitioned_regions/colocating_partitioned_region_data.html

平たく言うと、データ・アフィニティとか、データのグルーピング的な感じで扱われる概念ですね。

Custom-Partitioning/Colocation Data

どんなもの?

Apache Geodeは新規エントリーの格納時に、デフォルトではキーの内容に応じてどのBucketに割り当てるのかを決定します。Custom Partitioningを使うと、この挙動をコントロールできるようになるようです。

http://geode.docs.pivotal.io/docs/developing/partitioned_regions/custom_partitioning_and_data_colocation.html

この機能を使うと、関連するデータを同じRegionに集めたりすることができるようになり、Function実行時のパフォーマンス改善などが見込めると。

Custom-Partitioningには、2つの実現方法があります。

Standard custom partitioning

標準的な方法で、データのグルーピングについては指定できますが、どのBucketに配置されるかを指定することはできません。Geodeは、いつも指定したデータのグループが同じBucketにあるように維持し続けますが、ロードバランシングによって別のBucketに移動させることがあります。

Fixed custom partitioning

Standard custom partitioningに加えて、データをどのメンバーが保持するか、明確に指定するタイプのPartitioningです。エントリをどのBucketに割り当てるか、PrimaryとSecondaryをどのようにするかを指定することができます。

このため、このPartitioningではデータの配置を完全にコントロールすることができます。データを、特定の物理マシンに配置するようにすることなども可能になります。

ただし、これには次のトレードオフがあります。

  • Geodeは固定化されたPartitionをリバランスすることができなくなるため、想定外のデータ量とならないよう注意してデータをロードする必要がある
  • 各メンバーで異なるConfigurationとなる。PartitionResolver(後述)はPrimaryのIDを戻すことになり、SecondaryのIDを戻すことはない

後者がちょっとわかりにくいのですが、この機能を使うためにPartitionResolverというものを実装するのですが、これがいつもPrimaryのIDを返すことになり、Secondaryが配置場所の解決の際には出てこないよ、と言っているみたいです。

Region間のData Colocation

Geodeは関連するデータを、複数のRegionにまたがって単一のメンバーで保持することができます。関連するデータを、同じメンバー、同じBucketで保持できるようです。リバランスの際にもそれは崩れない、と。

例えば、顧客のデータと関連する顧客のデータを、同じメンバーに集めることができるといった感じです。

使ってみる

それでは、こちらを参照しながらCustom-Partitioning/Colocation Dataを試してみるとしましょう。

http://geode.docs.pivotal.io/docs/developing/partitioned_regions/using_custom_partition_resolvers.html

今回は、Standard custom partitioningを試してみたいと思います。Fixed custom partitioningは、ちょっといいかなぁと…。

Standard custom partitioningを使うには、PartitionResolverインターフェースを実装したクラスを用意し、次のいずれかの方法を取る必要があります。
※ちなみに、Fixed custom partitioningの場合はFixedPartitionResolverインターフェースを実装する必要があるようです

  • Region作成時にCustom Classとして登録する
  • エントリーのキーとして登録する
  • Cache callbackとして登録する

3つ目はなんだろう?と思いましたが、要はListenerですね。Javadocを見ると、確かにCacheCallbackインターフェースを拡張しています。

http://geode.incubator.apache.org/releases/latest/javadoc/com/gemstone/gemfire/cache/PartitionResolver.html

PartitionResolverでは、getNameおよびgetRoutingObject、そしてcloseメソッドを実装します。

closeは、CacheCallbackから引き継いだものです。エントリーの配置は、getRoutingObjectメソッドでコントロールします。

では、用意をしつつプログラムを書いていきます。

準備

まずは、Maven依存関係から。

        <dependency>
            <groupId>org.apache.geode</groupId>
            <artifactId>geode-lucene</artifactId>
            <version>1.0.0-incubating.M2</version>
        </dependency>

        <dependency>
            <groupId>org.zeroturnaround</groupId>
            <artifactId>zt-exec</artifactId>
            <version>1.9</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.5.2</version>
            <scope>test</scope>
        </dependency>

Apache Geodeがあるのはもちろんですが、テストコード用にJUnitとAssertJ、そしてプログラム内でクラスタを組むためにZT Process Executorを入れています。

gemfire.propertiesは用意しますが、中身は空とします。
src/test/resources/gemfire.properties

cache.xmlも使いますが、中身はまた後で。
src/test/resources/cache.xml

<?xml version="1.0" encoding="UTF-8"?>
<cache
        xmlns="http://geode.apache.org/schema/cache"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://geode.apache.org/schema/cache
        http://geode.apache.org/schema/cache/cache-1.0.xsd"
        version="1.0">

〜後で〜
</cache>

単純なApache Geode Server

今回、エントリーのメンバー間での配置をコントロールするので、複数メンバークラスタを構成して確認することになります。

このために、簡単にServerを起動できるプログラムを用意しました。
src/main/java/org/littlewings/geode/colocatedata/SimpleCacheServer.java

package org.littlewings.geode.colocatedata;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import com.gemstone.gemfire.distributed.ServerLauncher;
import org.zeroturnaround.exec.ProcessExecutor;
import org.zeroturnaround.exec.StartedProcess;
import org.zeroturnaround.exec.stream.slf4j.Slf4jStream;

public class SimpleCacheServer {
    protected ProcessExecutor executor;
    protected StartedProcess process;

    protected SimpleCacheServer(ProcessExecutor executor) {
        this.executor = executor;
    }

    public static void main(String... args) throws IOException {
        String workDir = "./target/" + System.getProperty("gemfire.name");
        Files.createDirectories(Paths.get(workDir));

        ServerLauncher serverLauncher =
                new ServerLauncher.Builder()
                        .setWorkingDirectory(workDir)
                        .setServerPort(Integer.parseInt(args[0]))
                        .build();
        serverLauncher.start();
    }

    protected void start() {
        try {
            process = executor.start();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void stop() {
        process.getProcess().destroy();
    }

    public static SimpleCacheServer run(int port, Properties properties) {
        List<String> commands = new ArrayList<>();
        commands.add("mvn");
        commands.add("exec:java");
        commands.add("-Dexec.mainClass=" + SimpleCacheServer.class.getName());
        commands.add("-Dexec.args=" + port);
        properties.entrySet().forEach(kv -> commands.add("-D" + kv.getKey() + "=" + kv.getValue()));

        SimpleCacheServer server =
                new SimpleCacheServer(new ProcessExecutor()
                        .redirectOutput(Slf4jStream.of(SimpleCacheServer.class).asInfo())
                        .command(commands));
        server.start();
        return server;
    }
}

これを、テストコードから呼び出してクラスタを構成します。

テストコードの雛形

テストコードの全体像は、こんな感じです。
src/test/java/org/littlewings/geode/colocatedata/ColocateDateTest.java

package org.littlewings.geode.colocatedata;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
import com.gemstone.gemfire.distributed.DistributedMember;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail;

public class ColocateDateTest {
    // ここに、テストを書く!

    protected <K, V> void withRegion(int numInstances, String cacheXmlFile, String regionName, BiConsumer<Cache, Region<K, V>> consumer) {
        int serverPort = 40404;
        int locatorPort = 10334;
        String locators = "localhost[" + locatorPort + "]";

        List<SimpleCacheServer> servers = new ArrayList<>();

        try (Cache cache =
                     new CacheFactory()
                             .set("name", "main-cache")
                             .set("cache-xml-file", cacheXmlFile)
                             .set("start-locator", locators)
                             .create()) {
            Region<K, V> region = cache.<K, V>getRegion(regionName);

            IntStream
                    .rangeClosed(1, numInstances - 1)
                    .forEach(i -> {
                        Properties properties = new Properties();
                        properties.setProperty("gemfire.name", "server" + i);
                        properties.setProperty("gemfire.cache-xml-file", cacheXmlFile);
                        properties.setProperty("gemfire.locators", "localhost[10334]");
                        properties.setProperty("gemfire.start-locator", "localhost[" + (locatorPort + i) + "]");
                        servers.add(SimpleCacheServer.run((serverPort + i), properties));
                    });

            TimeUnit.SECONDS.sleep(15L);

            assertThat(cache.getMembers().size() + 1)
                    .isEqualTo(numInstances);

            consumer.accept(cache, region);
        } catch (InterruptedException e) {
            fail(e.getMessage());
        } finally {
            servers.forEach(SimpleCacheServer::stop);
        }
    }
}

先ほど用意したServer用のプログラムを使用して、簡単にですがクラスタを構成します。

では、PartitionResolverを実装していきます。

お題

今回、Regionとしては意味的には

Region<String, String>

で考えます。

この時、キーを

"key-A-x"
"key-B-x"

のように用意して、「key-A」のグループと「key-B」のグループに分けてみたいと思います。

そもそも、なにもしないとどうなるか

PartitionResolverを特に設定しないデフォルトの状態で、テストコードを書いてみます。クラスタに参加するメンバーは、3とします。

    @Test
    public void simpleUsage() {
        this.<String, String>withRegion(3, "src/test/resources/cache.xml", "simpleRegion", (cache, region) -> {
            IntStream.rangeClosed(1, 5).forEach(i -> region.put("key-A-" + i, "value" + i));
            IntStream.rangeClosed(6, 10).forEach(i -> region.put("key-B-" + i, "value" + i));

            Set<DistributedMember> keyAMembers =
                    IntStream
                            .rangeClosed(1, 5)
                            .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, "key-A-" + i))
                            .collect(Collectors.toSet());
            Set<DistributedMember> keyBMembers =
                    IntStream
                            .rangeClosed(6, 10)
                            .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, "key-B-" + i))
                            .collect(Collectors.toSet());

            assertThat(keyAMembers.size())
                    .isGreaterThan(1);
            assertThat(keyBMembers.size())
                    .isGreaterThan(1);
        });
    }

この時のRegionの設定は、こちら。

    <region name="simpleRegion" refid="PARTITION_REDUNDANT"/>

結果、特にグルーピングはされず、「key-A〜」、「key-B〜」で格納されたエントリーが、複数のメンバーに割り当てられます。バラけている状態ですね。

エントリーのキーとしてPartitionResolverを使う

では、PartitionResolverを使う最初のサンプルとして、エントリーのキーをPartitionResolverを使う例を挙げたいと思います。

このケースでは、PartitionResolverがRegionにputするキーとなります。Regionで扱うため、Serializableである必要があります。
使いどころとしては、エントリーのキーとするクラスを自分でコントロールできる場合でしょうか。
src/main/java/org/littlewings/geode/colocatedata/GroupingKey.java

package org.littlewings.geode.colocatedata;

import java.io.Serializable;

import com.gemstone.gemfire.cache.EntryOperation;
import com.gemstone.gemfire.cache.PartitionResolver;

public class GroupingKey implements PartitionResolver, Serializable {
    private String prefix;
    private int number;

    public GroupingKey(String prefix, int number) {
        this.prefix = prefix;
        this.number = number;
    }

    @Override
    public Object getRoutingObject(EntryOperation opDetails) {
        return prefix;
    }

    @Override
    public String getName() {
        return getClass().getSimpleName();
    }

    @Override
    public void close() {
        // no-op
    }
}

今回は、グルーピングする単位をprefixとして受け取り

    public GroupingKey(String prefix, int number) {
        this.prefix = prefix;
        this.number = number;
    }

prefixをgetRoutingObjectメソッドで返却する実装としました。

    @Override
    public Object getRoutingObject(EntryOperation opDetails) {
        return prefix;
    }

確認してみましょう。

    @Test
    public void usingPartitionResolverKey() {
        this.<GroupingKey, String>withRegion(3, "src/test/resources/cache.xml", "groupingKeyRegion", (cache, region) -> {
            IntStream.rangeClosed(1, 5).forEach(i -> region.put(new GroupingKey("key-A", i), "value" + i));
            IntStream.rangeClosed(6, 10).forEach(i -> region.put(new GroupingKey("key-B", i), "value" + i));

            Set<DistributedMember> keyAMembers =
                    IntStream
                            .rangeClosed(1, 5)
                            .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, new GroupingKey("key-A", i)))
                            .collect(Collectors.toSet());
            Set<DistributedMember> keyBMembers =
                    IntStream
                            .rangeClosed(6, 10)
                            .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, new GroupingKey("key-B", i)))
                            .collect(Collectors.toSet());

            assertThat(keyAMembers.size())
                    .isEqualTo(1);
            assertThat(keyBMembers.size())
                    .isEqualTo(1);
        });
    }

この時のRegionの設定は、こちら。

    <region name="groupingKeyRegion" refid="PARTITION_REDUNDANT"/>

先ほど作成したGroupingKeyを、Region#putする際のキーとして使用します。

            IntStream.rangeClosed(1, 5).forEach(i -> region.put(new GroupingKey("key-A", i), "value" + i));
            IntStream.rangeClosed(6, 10).forEach(i -> region.put(new GroupingKey("key-B", i), "value" + i));

各キーに対応するメンバーを保持する、Primaryなメンバーを集めます。

            Set<DistributedMember> keyAMembers =
                    IntStream
                            .rangeClosed(1, 5)
                            .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, new GroupingKey("key-A", i)))
                            .collect(Collectors.toSet());
            Set<DistributedMember> keyBMembers =
                    IntStream
                            .rangeClosed(6, 10)
                            .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, new GroupingKey("key-B", i)))
                            .collect(Collectors.toSet());

すると、それぞれ見事にユニークになりました。

            assertThat(keyAMembers.size())
                    .isEqualTo(1);
            assertThat(keyBMembers.size())
                    .isEqualTo(1);

グルーピングして配置できた感じですね。

PartitionResolverを、Custom Classとして登録する

続いて、PartitionResolverを、Custom Classとして登録するパターンを扱います。このパターンは、キーに使うクラス自体をコントロールできない場合などに使うのでしょう。

この例で用意したPartitionResolverの実装は、こちらです。
src/main/java/org/littlewings/geode/colocatedata/GroupingKeyResolver.java

package org.littlewings.geode.colocatedata;

import java.util.Properties;

import com.gemstone.gemfire.cache.Declarable;
import com.gemstone.gemfire.cache.EntryOperation;
import com.gemstone.gemfire.cache.PartitionResolver;

public class GroupingKeyResolver implements PartitionResolver, Declarable {
    @Override
    public void init(Properties props) {
        // no-op
    }

    @Override
    public Object getRoutingObject(EntryOperation opDetails) {
        String key = (String) opDetails.getKey();
        System.out.println("Grouping Key = " + key.substring(0, key.indexOf('-', 5)));
        return key.substring(0, key.indexOf('-', 5));  // "key-N"
    }

    @Override
    public String getName() {
        return getClass().getSimpleName();
    }

    @Override
    public void close() {
        // no-op
    }
}

設定ファイルで登録するつもりで書いたので、Declarableインターフェースを実装しています。

getRoutingObjectメソッドの実装方法ですが、引数のEntryOperationからキーが取得できるので、ここではStringのキーを「key-A」や「key-B」の形に切り取って返すことにします。

    @Override
    public Object getRoutingObject(EntryOperation opDetails) {
        String key = (String) opDetails.getKey();
        System.out.println("Grouping Key = " + key.substring(0, key.indexOf('-', 5)));
        return key.substring(0, key.indexOf('-', 5));  // "key-N"
    }

このPartitionResolverを使ったテストコードは、こちら。

    @Test
    public void withPartitionResolver() {
        this.<String, String>withRegion(3, "src/test/resources/cache.xml", "withResolverRegion", (cache, region) -> {
            IntStream.rangeClosed(1, 5).forEach(i -> region.put("key-A-" + i, "value" + i));
            IntStream.rangeClosed(6, 10).forEach(i -> region.put("key-B-" + i, "value" + i));

            Set<DistributedMember> keyAMembers =
                    IntStream
                            .rangeClosed(1, 5)
                            .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, "key-A-" + i))
                            .collect(Collectors.toSet());
            Set<DistributedMember> keyBMembers =
                    IntStream
                            .rangeClosed(6, 10)
                            .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, "key-B-" + i))
                            .collect(Collectors.toSet());

            assertThat(keyAMembers.size())
                    .isEqualTo(1);
            assertThat(keyBMembers.size())
                    .isEqualTo(1);
        });
    }

パッと見はなんの変哲もない感じですが、ちゃんとグルーピングされています。

            assertThat(keyAMembers.size())
                    .isEqualTo(1);
            assertThat(keyBMembers.size())
                    .isEqualTo(1);

cache.xmlではどうなっているかというと、attributeとしてpartition-resolverで設定しています。

    <region name="withResolverRegion" refid="PARTITION_REDUNDANT">
        <region-attributes>
            <partition-attributes>
                <partition-resolver>
                    <class-name>org.littlewings.geode.colocatedata.GroupingKeyResolver</class-name>
                </partition-resolver>
            </partition-attributes>
        </region-attributes>
    </region>

こちらも確認できましたね。

Cache callbackとして使う

くどくなりそうなので、今回は省略…。

異なるRegionのデータの配置をコントロールする場合

最後は、複数の異なるRegionのデータの配置をコントロールする場合を試してみます。

http://geode.docs.pivotal.io/docs/developing/partitioned_regions/colocating_partitioned_region_data.html

デフォルトでは、Partitioned Regionで配置されるデータの場所は、他のRegionに依存しないそうですが、この挙動を変更できるようです。これを使うと、複数のRegionで関連するデータをまとめることができ、クエリやFunctionの実行の高速化などが期待できるようになります。

関連するRegionのPartitionResolverは、同じメカニズムでrouting objectを決定する必要があるようです(そりゃあそうだという気はしますが)。

まずは、2つの関連しないRegionについて、同じPartitionResolverを使って確認してみましょう。cache.xmlでは、このような定義とします。同じPartitionResolverを、Custom Classとして設定しています。

    <region name="withResolverRegion" refid="PARTITION_REDUNDANT">
        <region-attributes>
            <partition-attributes>
                <partition-resolver>
                    <class-name>org.littlewings.geode.colocatedata.GroupingKeyResolver</class-name>
                </partition-resolver>
            </partition-attributes>
        </region-attributes>
    </region>

    <region name="withResolverRegion-NonRelated" refid="PARTITION_REDUNDANT">
        <region-attributes>
            <partition-attributes>
                <partition-resolver>
                    <class-name>org.littlewings.geode.colocatedata.GroupingKeyResolver</class-name>
                </partition-resolver>
            </partition-attributes>
        </region-attributes>
    </region>

テストコード。簡易的に、「key-A〜」、「key-B〜」のキーを格納するRegionを用意し、もうひとつのRegionには「key-A-X〜」、「key-B-Y〜」という形式でキーを格納するようにしました。prefix的に、「key-A」および「key-B」で始まるデータはまとめたいところです。

    @Test
    public void colocateNonRelatedRegion() {
        this.<String, String>withRegion(3, "src/test/resources/cache.xml", "withResolverRegion", (cache, region) -> {
            Region<String, String> nonRelatedRegion = cache.getRegion("withResolverRegion-NonRelated");

            IntStream.rangeClosed(1, 5).forEach(i -> region.put("key-A-" + i, "value" + i));
            IntStream.rangeClosed(6, 10).forEach(i -> region.put("key-B-" + i, "value" + i));

            IntStream.rangeClosed(50, 55).forEach(i -> nonRelatedRegion.put("key-A-X-" + i, "value" + i));
            IntStream.rangeClosed(55, 60).forEach(i -> nonRelatedRegion.put("key-B-Y-" + i, "value" + i));


            Set<DistributedMember> keyAMembers =
                    IntStream
                            .rangeClosed(1, 5)
                            .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, "key-A-" + i))
                            .collect(Collectors.toSet());
            Set<DistributedMember> keyBMembers =
                    IntStream
                            .rangeClosed(6, 10)
                            .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, "key-B-" + i))
                            .collect(Collectors.toSet());

            Set<DistributedMember> keyAXMembers =
                    IntStream
                            .rangeClosed(1, 5)
                            .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(nonRelatedRegion, "key-A-X-" + i))
                            .collect(Collectors.toSet());
            Set<DistributedMember> keyBYMembers =
                    IntStream
                            .rangeClosed(6, 10)
                            .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(nonRelatedRegion, "key-B-Y-" + i))
                            .collect(Collectors.toSet());

            assertThat(keyAMembers.size())
                    .isEqualTo(1);
            assertThat(keyBMembers.size())
                    .isEqualTo(1);
            assertThat(keyAXMembers.size())
                    .isEqualTo(1);
            assertThat(keyBYMembers.size())
                    .isEqualTo(1);

            assertThat(keyAMembers)
                    .isNotEqualTo(keyAXMembers);
            assertThat(keyBMembers)
                    .isNotEqualTo(keyBYMembers);
        });
    }

それぞれグルーピングはされていますが、関連するデータは同じメンバーには集まっていません。

            assertThat(keyAMembers)
                    .isNotEqualTo(keyAXMembers);
            assertThat(keyBMembers)
                    .isNotEqualTo(keyBYMembers);

これをまとめるようにしてみましょう。

設定自体は簡単で、cache.xmlで以下のように定義します。やっぱり、同じPartitionResolverをCustom Classとして設定します。

    <region name="withResolverRegion" refid="PARTITION_REDUNDANT">
        <region-attributes>
            <partition-attributes>
                <partition-resolver>
                    <class-name>org.littlewings.geode.colocatedata.GroupingKeyResolver</class-name>
                </partition-resolver>
            </partition-attributes>
        </region-attributes>
    </region>

    <region name="withResolverRegion-Related" refid="PARTITION_REDUNDANT">
        <region-attributes>
            <partition-attributes colocated-with="withResolverRegion">
                <partition-resolver>
                    <class-name>org.littlewings.geode.colocatedata.GroupingKeyResolver</class-name>
                </partition-resolver>
            </partition-attributes>
        </region-attributes>
    </region>

なにが変わったかというと、後ろにあるPartitioned Regionでは、関連するRegionをcolocated-with属性で指定するようになりました。

    <region name="withResolverRegion-Related" refid="PARTITION_REDUNDANT">
        <region-attributes>
            <partition-attributes colocated-with="withResolverRegion">

これで、2つのRegionで関連するデータを、それぞれ同じメンバーに集めることができます。

    @Test
    public void colocateRelatedRegion() {
        this.<String, String>withRegion(3, "src/test/resources/cache.xml", "withResolverRegion", (cache, region) -> {
            Region<String, String> relatedRegion = cache.getRegion("withResolverRegion-Related");

            IntStream.rangeClosed(1, 5).forEach(i -> region.put("key-A-" + i, "value" + i));
            IntStream.rangeClosed(6, 10).forEach(i -> region.put("key-B-" + i, "value" + i));

            IntStream.rangeClosed(50, 55).forEach(i -> relatedRegion.put("key-A-X-" + i, "value" + i));
            IntStream.rangeClosed(55, 60).forEach(i -> relatedRegion.put("key-B-Y-" + i, "value" + i));


            Set<DistributedMember> keyAMembers =
                    IntStream
                            .rangeClosed(1, 5)
                            .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, "key-A-" + i))
                            .collect(Collectors.toSet());
            Set<DistributedMember> keyBMembers =
                    IntStream
                            .rangeClosed(6, 10)
                            .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(region, "key-B-" + i))
                            .collect(Collectors.toSet());

            Set<DistributedMember> keyAXMembers =
                    IntStream
                            .rangeClosed(1, 5)
                            .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(relatedRegion, "key-A-X-" + i))
                            .collect(Collectors.toSet());
            Set<DistributedMember> keyBYMembers =
                    IntStream
                            .rangeClosed(6, 10)
                            .mapToObj(i -> PartitionRegionHelper.getPrimaryMemberForKey(relatedRegion, "key-B-Y-" + i))
                            .collect(Collectors.toSet());

            assertThat(keyAMembers.size())
                    .isEqualTo(1);
            assertThat(keyBMembers.size())
                    .isEqualTo(1);
            assertThat(keyAXMembers.size())
                    .isEqualTo(1);
            assertThat(keyBYMembers.size())
                    .isEqualTo(1);

            assertThat(keyAMembers)
                    .isEqualTo(keyAXMembers);
            assertThat(keyBMembers)
                    .isEqualTo(keyBYMembers);
        });
    }

OKですね。

使うにはもう少し前提があって、

  • Regionを定義する時には、先にデータの中心となるRegionを定義すること
  • 関連させるPartitioned Regionは、partition attributesで指定する「recovery-delay」、「redundant-copies」、「startup-recovery-delay」、「total-num-buckets」は同じである必要がある
  • 関連するRegionで、同じPartitionResolverを使うこと
  • Regionのデータをディスクに永続化する場合は、各Regionが同じディスクにデータを保存するようにすること

といった感じみたいです。

まとめ

Apache GeodeのCustom-Partitioning/Colocation Dataを使って、データの配置(グルーピング)をコントロールする方法を見てみました。

このあたりは他のグリッドとそう変わらない概念だったので、あまり困ることはありませんでした。

が、完全に固定化できるとか、複数のRegionでもグルーピングをコントロールできるとは思っていなかったので、ここはちょっと驚きましたね。