CLOVER🍀

That was when it all began.

HazelcastのList、Set、Queueがスケールしないという話

ちょっと、ドキュメントを見ていて気になったので。

以前、HazelcastのDistributed Mapを使って、キーの配置状況を確認したことがあります。

HazelcastのDistibuted Mapにおける、キーの分散状況を確認する - CLOVER

これを見て、こうやって分散状況を知れるのかぁ程度に思っていたのですが、ちょっと事情が違う人たちがいたようなので。

注)
このテーマは、別にHazelcastをdisりたいわけではありません

スケールしない人たち

Hazelcastは、Map以外にもListやSetなど多彩なデータ構造を提供していますが、全部が全部、分散配置・スケールするわけではないようです。

Set。

Hazelcast Set is a non-partitioned data structure: all the data that belongs to a set will live on one single partition in that node.

Hazelcast Set cannot be scaled beyond the capacity of a single machine. Since the whole set lives on a single partition, storing large amount of data on a single set may cause memory pressure. Therefore, you should use multiple sets to store large amount of data; this way all the sets will be spread across the cluster, hence sharing the load.

A backup of Hazelcast Set is stored on a partition of another node in the cluster so that data is not lost in the event of a primary node failure.

http://docs.hazelcast.org/docs/3.5/manual/html-single/hazelcast-documentation.html#set

List。

Hazelcast List is a non-partitioned data structure where values and each backup are represented by their own single partition.

Hazelcast List cannot be scaled beyond the capacity of a single machine.

http://docs.hazelcast.org/docs/3.5/manual/html-single/hazelcast-documentation.html#list

というわけで、ドキュメント上はListとSetはスケールしません。ドキュメント上は…。

このような話もあり、前回よりもうちょっとデータ構造を広げて確認してみました。

対象は、

  • Map
  • MultiMap
  • List
  • Set
  • Queue
  • Cache

とします。今回は、TopicsとRingbuffer(Hazelcast 3.5〜)は外しました。あと、Replicated MapはそもそもDistributedでもないですし。

Partitionとは?

Hazelcastはまるっとエントリを持つのではなく、Partitionと呼ばれる単位でメモリを区切ってデータを保持します。デフォルトのPartition数は、271です。どのデータがどのパーティションに配置されるかは、データを登録する際のキーによって決まります。

そう、キーによって。

なので、Mapのようにキーを持つデータ構造は話がわかりやすいですが(事実、Partitionを扱うためのインターフェース、ParitionServiceからPartitionを取得するためには、キーが必要になります)、ListやSetはどうするんだろう?というのが疑問でした。

確認しないままだったんですけど。で、今回改めてドキュメントを読んでみて気付いた、と…。

まあ、それはさておき各データ構造に対して見ていってみましょう。

準備

Maven依存関係は、このように。

        <dependency>
            <groupId>com.hazelcast</groupId>
            <artifactId>hazelcast</artifactId>
            <version>3.5.3</version>
        </dependency>

Hazelcast 3.5.3を使用して、確認します。

確認方法

ちょっとした、コマンドラインツールを作ります。以下の挙動で。

  • all … 対象のデータ構造が保持している、全エントリを出力
  • locate [key] … 指定のキーに対応するPartitionの情報を出力
  • self … クラスタ内の自分自身の情報を出力
  • name … List、Setなどのスケールしないデータ構造向けに、Partitionの情報を出力
  • size … 対象のデータ構造のエントリ数を出力
  • exit … ツールを終了する

こんな感じです。

これをまとめて実装するために、こんなクラスを用意。
src/main/java/org/littlewings/hazelcast/partitions/HazelcastInterpreterSupport.java

package org.littlewings.hazelcast.partitions;

import java.io.Console;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Supplier;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.Partition;
import com.hazelcast.core.PartitionService;
import com.hazelcast.instance.HazelcastInstanceProxy;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.nio.serialization.SerializationService;
import com.hazelcast.partition.strategy.StringPartitioningStrategy;

public abstract class HazelcastInterpreterSupport {
    protected void withHazelcast(Consumer<HazelcastInstance> consumer) {
        HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance();

        try {
            logging("Hazelcast startup.");
            consumer.accept(hazelcast);
        } finally {
            hazelcast.getLifecycleService().shutdown();
            Hazelcast.shutdownAll();
            logging("Hazelcast shutdown.");
        }
    }

    protected abstract void execute(String... args);

    protected void readConsoleWhile(HazelcastInstance hazelcast, String name, Supplier<Void> showAll, Supplier<Integer> counter) {
        Console console = System.console();
        String line;
        while ((line = console.readLine("> ")) != null) {
            if (line.isEmpty()) {
                continue;
            }

            String[] tokens = line.split("\\s+", -1);
            String command = tokens[0];
            boolean stop = false;

            switch (command) {
                case "all":
                    showAll.get();
                    break;
                case "locate":
                    if (tokens.length > 1) {
                        String key = tokens[1];
                        Partition partition = hazelcast.getPartitionService().getPartition(key);
                        show("Locate key = %s, partitionId = %s, owner = %s.", key, partition.getPartitionId(), partition.getOwner());
                    } else {
                        show("Locate, required key.");
                    }
                    break;
                case "self":
                    show("Self = %s.", hazelcast.getCluster().getLocalMember());
                    break;
                case "name":
                    PartitionService partitionService = hazelcast.getPartitionService();
                    SerializationService serializationService = ((HazelcastInstanceProxy)hazelcast).getSerializationService();
                    Data key = serializationService.toData(name, StringPartitioningStrategy.INSTANCE);
                    Partition partition = partitionService.getPartition(key);
                    show("Partition by name = %s, partitionId = %s, owner = %s.", name, partition.getPartitionId(), partition.getOwner());
                    break;
                case "size":
                    show("This data set size = %d.", counter.get());
                    break;
                case "exit":
                    stop = true;
                    break;
                default:
                    show("Unknown command = %s.", command);
                    break;
            }

            if (stop) {
                break;
            }
        }
    }

    protected void logging(String format, Object... args) {
        LocalDateTime now = LocalDateTime.now();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss");
        System.out.println("[" + formatter.format(now) + "] " + String.format(format, args));
    }

    protected void show(String format, Object... args) {
        System.out.println(String.format(format, args));
    }
}

あとは、各データ構造に応じてクラスを作成していきます。

ここから作成したツールでHazelcastのNodeを3つ起動し、同時に2つkillしてどうなるのか見ようという試みをしてみます。

Map

まずは、オーソドックスにMapから。
src/main/java/org/littlewings/hazelcast/partitions/HazelcastMapInterpreter.java

package org.littlewings.hazelcast.partitions;

import java.util.stream.IntStream;

import com.hazelcast.core.IMap;
import com.hazelcast.core.Partition;

public class HazelcastMapInterpreter extends HazelcastInterpreterSupport {
    public static void main(String... args) {
        new HazelcastMapInterpreter().execute(args);
    }

    @Override
    protected void execute(String... args) {
        withHazelcast(hazelcast -> {
            String name = "default";
            IMap<String, Integer> map = hazelcast.getMap(name);

            if (args.length > 0) {
                if ("master".equals(args[0])) {
                    IntStream.rangeClosed(1, 10).forEach(i -> map.put("key" + i, i));
                }
            }

            readConsoleWhile(hazelcast,
                    name,
                    () -> {
                        map
                                .keySet()
                                .forEach(k -> {
                                    Partition partition = hazelcast.getPartitionService().getPartition(k);
                                    show("key = %s, partitionId = %d, owner = %s.", k, partition.getPartitionId(), partition.getOwner());
                                });
                        return null;
                    },
                    map::size);
        });
    }
}

これを、3 Node起動してみます。「-Dexec.args=master」と引数を付けたNodeのみ、データを投入します。

## Node 1
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.partitions.HazelcastMapInterp
reter -Dexec.args=master

## Node 2
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.partitions.HazelcastMapInterp
reter

## Node 3
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.partitions.HazelcastMapInterp
reter

クラスタが構成されます。

Members [3] {
	Member [172.17.42.1]:5701 this
	Member [172.17.42.1]:5702
	Member [172.17.42.1]:5703
}

ちょっとデータを見てみます。

> size
This data set size = 10.
> all
key = key9, partitionId = 125, owner = Member [172.17.42.1]:5702.
key = key8, partitionId = 164, owner = Member [172.17.42.1]:5702.
key = key4, partitionId = 15, owner = Member [172.17.42.1]:5703.
key = key6, partitionId = 258, owner = Member [172.17.42.1]:5701 this.
key = key3, partitionId = 88, owner = Member [172.17.42.1]:5703.
key = key1, partitionId = 152, owner = Member [172.17.42.1]:5702.
key = key5, partitionId = 58, owner = Member [172.17.42.1]:5703.
key = key7, partitionId = 48, owner = Member [172.17.42.1]:5703.
key = key10, partitionId = 12, owner = Member [172.17.42.1]:5703.
key = key2, partitionId = 221, owner = Member [172.17.42.1]:5701 this.
> locate key1
Locate key = key1, partitionId = 152, owner = Member [172.17.42.1]:5702.
> locate key2
Locate key = key2, partitionId = 221, owner = Member [172.17.42.1]:5701 this.

自分自身。

> self
Self = Member [172.17.42.1]:5701 this.

ここで、Node 1以外の2つのNodeを落としてみます。

$ kill 128299 128266

クラスタ内のNodeが、ひとつになります。

Members [1] {
	Member [172.17.42.1]:5701 this
}

この時のデータの状態ですが、当然ながら欠損します。

> size
This data set size = 5.
> all
key = key9, partitionId = 125, owner = Member [172.17.42.1]:5701 this.
key = key6, partitionId = 258, owner = Member [172.17.42.1]:5701 this.
key = key3, partitionId = 88, owner = Member [172.17.42.1]:5701 this.
key = key7, partitionId = 48, owner = Member [172.17.42.1]:5701 this.
key = key2, partitionId = 221, owner = Member [172.17.42.1]:5701 this.

全データが自Nodeプライマリになりますが、自Nodeがバックアップとしても持っていなかったと思われるデータはロストします。そりゃあそうですね。

補足)
Hazelcastのデフォルトのバックアップ数は1なので、Nodeがひとつ落ちる分には問題なく、別Nodeからデータをコピーしてきて戻そうとします。今回は、2つNodeを同時に落とすというケースをやっているので、バックアップも同時に失うことになり、この場合はデータが欠損するということになります。

なお、Mapの場合、Partitionに割り当てるキーは、やっぱり引数のキーから作るみたいですね。

    @Override
    public V put(final K k, final V v, final long ttl, final TimeUnit timeunit) {
        checkNotNull(k, NULL_KEY_IS_NOT_ALLOWED);
        checkNotNull(v, NULL_VALUE_IS_NOT_ALLOWED);

        final Data key = toData(k, partitionStrategy);
        final Data value = toData(v);
        final Data result = putInternal(key, value, ttl, timeunit);
        return (V) toObject(result);
    }

https://github.com/hazelcast/hazelcast/blob/v3.5.3/hazelcast/src/main/java/com/hazelcast/map/impl/proxy/MapProxyImpl.java#L95

 int partitionId = nodeEngine.getPartitionService().getPartitionId(key);

https://github.com/hazelcast/hazelcast/blob/v3.5.3/hazelcast/src/main/java/com/hazelcast/map/impl/proxy/MapProxySupport.java#L444

MultiMap

続いては、同じくキーを持つデータ構造、MultiMap。ソースコードの全体は、端折ります…。

    @Override
    protected void execute(String... args) {
        withHazelcast(hazelcast -> {
            String name = "default";
            MultiMap<String, Integer> map = hazelcast.getMultiMap(name);

            if (args.length > 0) {
                if ("master".equals(args[0])) {
                    IntStream
                            .rangeClosed(1, 10)
                            .forEach(i ->
                                    IntStream.rangeClosed(1, 5).forEach(j -> map.put("key" + i, j)));
                }
            }

            readConsoleWhile(hazelcast,
                    name,
                    () -> {
                        map
                                .keySet()
                                .forEach(k -> {
                                    Partition partition = hazelcast.getPartitionService().getPartition(k);
                                    show("key = %s, values = %s, partitionId = %d, owner = %s.",
                                            k,
                                            map.get(k),
                                            partition.getPartitionId(),
                                            partition.getOwner());
                                });
                        return null;
                    },
                    map::size);
        });
    }

同じく、3 Node起動後確認。

> size
This data set size = 50.
> all
key = key1, values = [1, 2, 3, 4, 5], partitionId = 152, owner = Member [172.17.42.1]:5702.
key = key2, values = [1, 2, 3, 4, 5], partitionId = 221, owner = Member [172.17.42.1]:5701 this.
key = key5, values = [1, 2, 3, 4, 5], partitionId = 58, owner = Member [172.17.42.1]:5703.
key = key6, values = [1, 2, 3, 4, 5], partitionId = 258, owner = Member [172.17.42.1]:5701 this.
key = key3, values = [1, 2, 3, 4, 5], partitionId = 88, owner = Member [172.17.42.1]:5703.
key = key4, values = [1, 2, 3, 4, 5], partitionId = 15, owner = Member [172.17.42.1]:5703.
key = key9, values = [1, 2, 3, 4, 5], partitionId = 125, owner = Member [172.17.42.1]:5702.
key = key7, values = [1, 2, 3, 4, 5], partitionId = 48, owner = Member [172.17.42.1]:5703.
key = key8, values = [1, 2, 3, 4, 5], partitionId = 164, owner = Member [172.17.42.1]:5702.
key = key10, values = [1, 2, 3, 4, 5], partitionId = 12, owner = Member [172.17.42.1]:5703.

MultiMapの場合は、MultiMap#sizeがキーに対応するエントリ数に加えて、どのくらいキーに紐づく値があるのかも合算されます。

2つ落として、確認。

> size
This data set size = 40.
> all
key = key1, values = [1, 2, 3, 4, 5], partitionId = 152, owner = Member [172.17.42.1]:5701 this.
key = key2, values = [1, 2, 3, 4, 5], partitionId = 221, owner = Member [172.17.42.1]:5701 this.
key = key5, values = [1, 2, 3, 4, 5], partitionId = 58, owner = Member [172.17.42.1]:5701 this.
key = key6, values = [1, 2, 3, 4, 5], partitionId = 258, owner = Member [172.17.42.1]:5701 this.
key = key4, values = [1, 2, 3, 4, 5], partitionId = 15, owner = Member [172.17.42.1]:5701 this.
key = key9, values = [1, 2, 3, 4, 5], partitionId = 125, owner = Member [172.17.42.1]:5701 this.
key = key8, values = [1, 2, 3, 4, 5], partitionId = 164, owner = Member [172.17.42.1]:5701 this.
key = key10, values = [1, 2, 3, 4, 5], partitionId = 12, owner = Member [172.17.42.1]:5701 this.

意外と残りましたが、やっぱり欠損はしましたね。

MultiMapの場合も、Partitionに紐づけるキーは、エントリのキーから作成します。
https://github.com/hazelcast/hazelcast/blob/v3.5.3/hazelcast/src/main/java/com/hazelcast/multimap/impl/ObjectMultiMapProxy.java#L107
https://github.com/hazelcast/hazelcast/blob/v3.5.3/hazelcast/src/main/java/com/hazelcast/multimap/impl/MultiMapProxySupport.java#L236

List

で、ここからはキーを持たないデータ構造。まずはListから。

    @Override
    protected void execute(String... args) {
        withHazelcast(hazelcast -> {
            String name = "default";
            IList<Integer> list = hazelcast.getList(name);

            if (args.length > 0) {
                if ("master".equals(args[0])) {
                    IntStream.rangeClosed(1, 10).forEach(list::add);
                }
            }

            readConsoleWhile(hazelcast,
                    name,
                    () -> {
                        list.forEach(e -> show("element = %d.", e));
                        return null;
                    },
                    list::size);
        });
    }

Nodeを3つ起動。

## Node 1
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.partitions.HazelcastListInter
preter -Dexec.args=master

## Node 2
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.partitions.HazelcastListInter
preter

## Node 3
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.partitions.HazelcastListInter
preter

確認。

> size
This data set size = 10.
> all
element = 1.
element = 2.
element = 3.
element = 4.
element = 5.
element = 6.
element = 7.
element = 8.
element = 9.
element = 10.

この時、Listなどのために用意した、以下のコマンドを使ってみます。

                case "name":
                    PartitionService partitionService = hazelcast.getPartitionService();
                    SerializationService serializationService = ((HazelcastInstanceProxy)hazelcast).getSerializationService();
                    Data key = serializationService.toData(name, StringPartitioningStrategy.INSTANCE);
                    Partition partition = partitionService.getPartition(key);
                    show("Partition by name = %s, partitionId = %s, owner = %s.", name, partition.getPartitionId(), partition.getOwner());
                    break;

Node 2ですと。

> name
Partition by name = default, partitionId = 103, owner = Member [172.17.42.1]:5702.

いろいろ試したところ、Node 2をダウンさせると、うちの環境ではこの値がNode 2⇒Node 1と移りやすかったので、Node 1とNode 2をまとめてkill。

$ kill 128965 129035

結果。

> size
This data set size = 0.
> all

空になりました。

同時にkillではなく、ひとつずつ落としていくとデータは残るのですが、このようにデータを持っているNodeを、バックアップNode含めて同時に狙い撃ちにすると、完全になくなるみたいです。
※バックアップのNodeの情報は普通には得られそうにないので、この方法だと関係ないNodeを落としてしまうことも…

というわけで、Listは単一のNodeに全データを持っているということになりそうですね(バックアップはありますが)。

なんでこのような挙動になっているかですが、HazelcastのList(とSet)の親クラスになっているAbstractCollectionProxyImplクラスの以下の部分で、PartitionIdが固定されているからだと思われます。

    protected AbstractCollectionProxyImpl(String name, NodeEngine nodeEngine, S service) {
        super(nodeEngine, service);
        this.name = name;
        this.partitionId = nodeEngine.getPartitionService().getPartitionId(getNameAsPartitionAwareData());
    }

https://github.com/hazelcast/hazelcast/blob/v3.5.3/hazelcast/src/main/java/com/hazelcast/collection/impl/collection/AbstractCollectionProxyImpl.java#L64

このgetNameAsPartitionAwareDataというメソッドは、AbstractDistributedObjectクラスにて定義されています。

    protected Data getNameAsPartitionAwareData() {
        String name = getName();
        return getNodeEngine().getSerializationService().toData(name, PARTITIONING_STRATEGY);
    }

https://github.com/hazelcast/hazelcast/blob/v3.5.3/hazelcast/src/main/java/com/hazelcast/spi/AbstractDistributedObject.java#L43

このnameは、分散オブジェクトの名前(今回のコード例でいくと、「default」)。PARTITIONING_STRATEGYというのは、AbstractDistributedObjectクラスに固定で定義されています。

 protected static final PartitioningStrategy PARTITIONING_STRATEGY = StringPartitioningStrategy.INSTANCE;

https://github.com/hazelcast/hazelcast/blob/v3.5.3/hazelcast/src/main/java/com/hazelcast/spi/AbstractDistributedObject.java#L31

というわけで、いつも同じPartitionを指すわけですね。

この部分を近い形に再現したのが、以下というわけでした。

                case "name":
                    PartitionService partitionService = hazelcast.getPartitionService();
                    SerializationService serializationService = ((HazelcastInstanceProxy)hazelcast).getSerializationService();
                    Data key = serializationService.toData(name, StringPartitioningStrategy.INSTANCE);
                    Partition partition = partitionService.getPartition(key);
                    show("Partition by name = %s, partitionId = %s, owner = %s.", name, partition.getPartitionId(), partition.getOwner());
                    break;

Set

Setは、Listと同じ構造(ProxyクラスがAbstractCollectionProxyImplクラスのサブクラス)なので、結果は端折ります。

コードは、こんな感じです。

    @Override
    protected void execute(String... args) {
        withHazelcast(hazelcast -> {
            String name = "default";
            ISet<Integer> set = hazelcast.getSet(name);

            if (args.length > 0) {
                if ("master".equals(args[0])) {
                    IntStream.rangeClosed(1, 10).forEach(set::add);
                }
            }

            readConsoleWhile(hazelcast,
                    name,
                    () -> {
                        set.forEach(e -> show("element = %d.", e));
                        return null;
                    },
                    set::size);
        });
    }

Queue

Queueは、ドキュメント上は特に何も書いていませんが、ListやSetと同じ気がします。

コードは、こちら。

    @Override
    protected void execute(String... args) {
        withHazelcast(hazelcast -> {
            String name = "default";
            IQueue<Integer> queue = hazelcast.getQueue(name);

            if (args.length > 0) {
                if ("master".equals(args[0])) {
                    IntStream.rangeClosed(1, 10).forEach(i -> {
                        try {
                            queue.put(i);
                        } catch (InterruptedException e) {

                        }
                    });
                }
            }

            readConsoleWhile(hazelcast,
                    name,
                    () -> {
                        queue.forEach(e -> show("element = %d.", e));
                        return null;
                    },
                    queue::size);
        });
    }

Nodeを3つ起動して、確認。

> size
This data set size = 10.
> all
element = 1.
element = 2.
element = 3.
element = 4.
element = 5.
element = 6.
element = 7.
element = 8.
element = 9.
element = 10.

2つ、落とします。

> size
This data set size = 0.
> all

バックアップNodeも含めて同時に落とすと、データがなくなりました…。

もちろん、順次落とした場合はこういうことになりませんが、ひとつのNodeでデータをまかなう構造みたいですね。

実装を見たところ、QueueはAbstractCollectionProxyImplクラスを継承しているわけではないのですが、やっていることは同じみたいです。

    QueueProxySupport(final String name, final QueueService queueService, NodeEngine nodeEngine) {
        super(nodeEngine, queueService);
        this.name = name;
        this.partitionId = nodeEngine.getPartitionService().getPartitionId(getNameAsPartitionAwareData());
        this.config = nodeEngine.getConfig().findQueueConfig(name);
    }

https://github.com/hazelcast/hazelcast/blob/v3.5.3/hazelcast/src/main/java/com/hazelcast/collection/impl/queue/QueueProxySupport.java#L63

Partition Id固定、と。

というわけで、List、Setと同じようにひとつのNodeのメモリサイズで頑張りましょうということになりそうですね。

Cache

オマケで、Mapと似た存在のCacheも入れてみました。

Cacheを使う場合は、JCacheへの依存関係が必要になります。

        <dependency>
            <groupId>javax.cache</groupId>
            <artifactId>cache-api</artifactId>
            <version>1.0.0</version>
        </dependency>

コードについては、HazelcastInstanceから直接Cacheを取得できない感じなので、JCacheのAPIからunwrapして引っこ抜きます。

    @Override
    protected void execute(String... args) {
        String name = "default";

        Configuration<String, Integer> configuration =
                new MutableConfiguration<String, Integer>()
                        .setTypes(String.class, Integer.class);

        try (CachingProvider cachingProvider = Caching.getCachingProvider();
             CacheManager cacheManager = cachingProvider.getCacheManager();
             Cache<String, Integer> cache = cacheManager.createCache(name, configuration)) {
            HazelcastCacheManager hazelcastCacheManager = cacheManager.unwrap(HazelcastServerCacheManager.class);
            HazelcastInstance hazelcast = hazelcastCacheManager.getHazelcastInstance();
            ICache<String, Integer> hazelcastCache = cache.unwrap(ICache.class);

            if (args.length > 0) {
                if ("master".equals(args[0])) {
                    IntStream.rangeClosed(1, 10).forEach(i -> hazelcastCache.put("key" + i, i));
                }
            }

            readConsoleWhile(hazelcast,
                    name,
                    () -> {
                        StreamSupport.stream(hazelcastCache.spliterator(), false)
                                .forEach(entry -> {
                                    String k = entry.getKey();
                                    Partition partition = hazelcast.getPartitionService().getPartition(k);
                                    show("key = %s, partitionId = %d, owner = %s.", k, partition.getPartitionId(), partition.getOwner());
                                });
                        return null;
                    },
                    hazelcastCache::size);
        }
    }

結果は、Mapと一緒で落とす前は

> size
This data set size = 10.
> all
key = key10, partitionId = 12, owner = Member [172.17.42.1]:5703.
key = key4, partitionId = 15, owner = Member [172.17.42.1]:5703.
key = key7, partitionId = 48, owner = Member [172.17.42.1]:5702.
key = key5, partitionId = 58, owner = Member [172.17.42.1]:5702.
key = key3, partitionId = 88, owner = Member [172.17.42.1]:5702.
key = key9, partitionId = 125, owner = Member [172.17.42.1]:5702.
key = key1, partitionId = 152, owner = Member [172.17.42.1]:5703.
key = key8, partitionId = 164, owner = Member [172.17.42.1]:5703.
key = key2, partitionId = 221, owner = Member [172.17.42.1]:5701 this.
key = key6, partitionId = 258, owner = Member [172.17.42.1]:5701 this.

だったのが、2つ同時に落とすと

> size
This data set size = 5.
> all
key = key10, partitionId = 12, owner = Member [172.17.42.1]:5701 this.
key = key4, partitionId = 15, owner = Member [172.17.42.1]:5701 this.
key = key9, partitionId = 125, owner = Member [172.17.42.1]:5701 this.
key = key2, partitionId = 221, owner = Member [172.17.42.1]:5701 this.
key = key6, partitionId = 258, owner = Member [172.17.42.1]:5701 this.

データが一部欠損します、と。

実装も、Mapと近いですね。
https://github.com/hazelcast/hazelcast/blob/v3.5.3/hazelcast/src/main/java/com/hazelcast/cache/impl/AbstractInternalCacheProxy.java#L162

まとめ

Hazelcastのデータ構造のうち、(TopicsとRingbufferは確認していないですが)ListやSet、Queueなど一部のデータ構造はスケールしない=ひとつのNodeに全部格納されるという挙動を確認しました。

ドキュメントに記載はありますし、キーをもとに分散しているので、そりゃあそうなるよねぇという気も。こういうの見てると、KVSって感じがしますね。

ソースコードもそこそこ追いましたし、よい勉強になりました。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/hazelcast-examples/tree/master/hazelcast-collection-partitions