ちょっと、ドキュメントを見ていて気になったので。
以前、HazelcastのDistributed Mapを使って、キーの配置状況を確認したことがあります。
HazelcastのDistibuted Mapにおける、キーの分散状況を確認する - CLOVER
これを見て、こうやって分散状況を知れるのかぁ程度に思っていたのですが、ちょっと事情が違う人たちがいたようなので。
注)
このテーマは、別にHazelcastをdisりたいわけではありません
スケールしない人たち
Hazelcastは、Map以外にもListやSetなど多彩なデータ構造を提供していますが、全部が全部、分散配置・スケールするわけではないようです。
Set。
Hazelcast Set is a non-partitioned data structure: all the data that belongs to a set will live on one single partition in that node.
Hazelcast Set cannot be scaled beyond the capacity of a single machine. Since the whole set lives on a single partition, storing large amount of data on a single set may cause memory pressure. Therefore, you should use multiple sets to store large amount of data; this way all the sets will be spread across the cluster, hence sharing the load.
A backup of Hazelcast Set is stored on a partition of another node in the cluster so that data is not lost in the event of a primary node failure.
http://docs.hazelcast.org/docs/3.5/manual/html-single/hazelcast-documentation.html#set
List。
Hazelcast List is a non-partitioned data structure where values and each backup are represented by their own single partition.
Hazelcast List cannot be scaled beyond the capacity of a single machine.
http://docs.hazelcast.org/docs/3.5/manual/html-single/hazelcast-documentation.html#list
というわけで、ドキュメント上はListとSetはスケールしません。ドキュメント上は…。
このような話もあり、前回よりもうちょっとデータ構造を広げて確認してみました。
対象は、
- Map
- MultiMap
- List
- Set
- Queue
- Cache
とします。今回は、TopicsとRingbuffer(Hazelcast 3.5〜)は外しました。あと、Replicated MapはそもそもDistributedでもないですし。
Partitionとは?
Hazelcastはまるっとエントリを持つのではなく、Partitionと呼ばれる単位でメモリを区切ってデータを保持します。デフォルトのPartition数は、271です。どのデータがどのパーティションに配置されるかは、データを登録する際のキーによって決まります。
そう、キーによって。
なので、Mapのようにキーを持つデータ構造は話がわかりやすいですが(事実、Partitionを扱うためのインターフェース、ParitionServiceからPartitionを取得するためには、キーが必要になります)、ListやSetはどうするんだろう?というのが疑問でした。
確認しないままだったんですけど。で、今回改めてドキュメントを読んでみて気付いた、と…。
まあ、それはさておき各データ構造に対して見ていってみましょう。
準備
Maven依存関係は、このように。
<dependency> <groupId>com.hazelcast</groupId> <artifactId>hazelcast</artifactId> <version>3.5.3</version> </dependency>
Hazelcast 3.5.3を使用して、確認します。
確認方法
ちょっとした、コマンドラインツールを作ります。以下の挙動で。
- all … 対象のデータ構造が保持している、全エントリを出力
- locate [key] … 指定のキーに対応するPartitionの情報を出力
- self … クラスタ内の自分自身の情報を出力
- name … List、Setなどのスケールしないデータ構造向けに、Partitionの情報を出力
- size … 対象のデータ構造のエントリ数を出力
- exit … ツールを終了する
こんな感じです。
これをまとめて実装するために、こんなクラスを用意。
src/main/java/org/littlewings/hazelcast/partitions/HazelcastInterpreterSupport.java
package org.littlewings.hazelcast.partitions; import java.io.Console; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Set; import java.util.function.Consumer; import java.util.function.Supplier; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.Partition; import com.hazelcast.core.PartitionService; import com.hazelcast.instance.HazelcastInstanceProxy; import com.hazelcast.nio.serialization.Data; import com.hazelcast.nio.serialization.SerializationService; import com.hazelcast.partition.strategy.StringPartitioningStrategy; public abstract class HazelcastInterpreterSupport { protected void withHazelcast(Consumer<HazelcastInstance> consumer) { HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance(); try { logging("Hazelcast startup."); consumer.accept(hazelcast); } finally { hazelcast.getLifecycleService().shutdown(); Hazelcast.shutdownAll(); logging("Hazelcast shutdown."); } } protected abstract void execute(String... args); protected void readConsoleWhile(HazelcastInstance hazelcast, String name, Supplier<Void> showAll, Supplier<Integer> counter) { Console console = System.console(); String line; while ((line = console.readLine("> ")) != null) { if (line.isEmpty()) { continue; } String[] tokens = line.split("\\s+", -1); String command = tokens[0]; boolean stop = false; switch (command) { case "all": showAll.get(); break; case "locate": if (tokens.length > 1) { String key = tokens[1]; Partition partition = hazelcast.getPartitionService().getPartition(key); show("Locate key = %s, partitionId = %s, owner = %s.", key, partition.getPartitionId(), partition.getOwner()); } else { show("Locate, required key."); } break; case "self": show("Self = %s.", hazelcast.getCluster().getLocalMember()); break; case "name": PartitionService partitionService = hazelcast.getPartitionService(); SerializationService serializationService = ((HazelcastInstanceProxy)hazelcast).getSerializationService(); Data key = serializationService.toData(name, StringPartitioningStrategy.INSTANCE); Partition partition = partitionService.getPartition(key); show("Partition by name = %s, partitionId = %s, owner = %s.", name, partition.getPartitionId(), partition.getOwner()); break; case "size": show("This data set size = %d.", counter.get()); break; case "exit": stop = true; break; default: show("Unknown command = %s.", command); break; } if (stop) { break; } } } protected void logging(String format, Object... args) { LocalDateTime now = LocalDateTime.now(); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"); System.out.println("[" + formatter.format(now) + "] " + String.format(format, args)); } protected void show(String format, Object... args) { System.out.println(String.format(format, args)); } }
あとは、各データ構造に応じてクラスを作成していきます。
ここから作成したツールでHazelcastのNodeを3つ起動し、同時に2つkillしてどうなるのか見ようという試みをしてみます。
Map
まずは、オーソドックスにMapから。
src/main/java/org/littlewings/hazelcast/partitions/HazelcastMapInterpreter.java
package org.littlewings.hazelcast.partitions; import java.util.stream.IntStream; import com.hazelcast.core.IMap; import com.hazelcast.core.Partition; public class HazelcastMapInterpreter extends HazelcastInterpreterSupport { public static void main(String... args) { new HazelcastMapInterpreter().execute(args); } @Override protected void execute(String... args) { withHazelcast(hazelcast -> { String name = "default"; IMap<String, Integer> map = hazelcast.getMap(name); if (args.length > 0) { if ("master".equals(args[0])) { IntStream.rangeClosed(1, 10).forEach(i -> map.put("key" + i, i)); } } readConsoleWhile(hazelcast, name, () -> { map .keySet() .forEach(k -> { Partition partition = hazelcast.getPartitionService().getPartition(k); show("key = %s, partitionId = %d, owner = %s.", k, partition.getPartitionId(), partition.getOwner()); }); return null; }, map::size); }); } }
これを、3 Node起動してみます。「-Dexec.args=master」と引数を付けたNodeのみ、データを投入します。
## Node 1 $ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.partitions.HazelcastMapInterp reter -Dexec.args=master ## Node 2 $ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.partitions.HazelcastMapInterp reter ## Node 3 $ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.partitions.HazelcastMapInterp reter
クラスタが構成されます。
Members [3] { Member [172.17.42.1]:5701 this Member [172.17.42.1]:5702 Member [172.17.42.1]:5703 }
ちょっとデータを見てみます。
> size This data set size = 10. > all key = key9, partitionId = 125, owner = Member [172.17.42.1]:5702. key = key8, partitionId = 164, owner = Member [172.17.42.1]:5702. key = key4, partitionId = 15, owner = Member [172.17.42.1]:5703. key = key6, partitionId = 258, owner = Member [172.17.42.1]:5701 this. key = key3, partitionId = 88, owner = Member [172.17.42.1]:5703. key = key1, partitionId = 152, owner = Member [172.17.42.1]:5702. key = key5, partitionId = 58, owner = Member [172.17.42.1]:5703. key = key7, partitionId = 48, owner = Member [172.17.42.1]:5703. key = key10, partitionId = 12, owner = Member [172.17.42.1]:5703. key = key2, partitionId = 221, owner = Member [172.17.42.1]:5701 this. > locate key1 Locate key = key1, partitionId = 152, owner = Member [172.17.42.1]:5702. > locate key2 Locate key = key2, partitionId = 221, owner = Member [172.17.42.1]:5701 this.
自分自身。
> self Self = Member [172.17.42.1]:5701 this.
ここで、Node 1以外の2つのNodeを落としてみます。
$ kill 128299 128266
クラスタ内のNodeが、ひとつになります。
Members [1] { Member [172.17.42.1]:5701 this }
この時のデータの状態ですが、当然ながら欠損します。
> size This data set size = 5. > all key = key9, partitionId = 125, owner = Member [172.17.42.1]:5701 this. key = key6, partitionId = 258, owner = Member [172.17.42.1]:5701 this. key = key3, partitionId = 88, owner = Member [172.17.42.1]:5701 this. key = key7, partitionId = 48, owner = Member [172.17.42.1]:5701 this. key = key2, partitionId = 221, owner = Member [172.17.42.1]:5701 this.
全データが自Nodeプライマリになりますが、自Nodeがバックアップとしても持っていなかったと思われるデータはロストします。そりゃあそうですね。
補足)
Hazelcastのデフォルトのバックアップ数は1なので、Nodeがひとつ落ちる分には問題なく、別Nodeからデータをコピーしてきて戻そうとします。今回は、2つNodeを同時に落とすというケースをやっているので、バックアップも同時に失うことになり、この場合はデータが欠損するということになります。
なお、Mapの場合、Partitionに割り当てるキーは、やっぱり引数のキーから作るみたいですね。
@Override public V put(final K k, final V v, final long ttl, final TimeUnit timeunit) { checkNotNull(k, NULL_KEY_IS_NOT_ALLOWED); checkNotNull(v, NULL_VALUE_IS_NOT_ALLOWED); final Data key = toData(k, partitionStrategy); final Data value = toData(v); final Data result = putInternal(key, value, ttl, timeunit); return (V) toObject(result); }
int partitionId = nodeEngine.getPartitionService().getPartitionId(key);
MultiMap
続いては、同じくキーを持つデータ構造、MultiMap。ソースコードの全体は、端折ります…。
@Override protected void execute(String... args) { withHazelcast(hazelcast -> { String name = "default"; MultiMap<String, Integer> map = hazelcast.getMultiMap(name); if (args.length > 0) { if ("master".equals(args[0])) { IntStream .rangeClosed(1, 10) .forEach(i -> IntStream.rangeClosed(1, 5).forEach(j -> map.put("key" + i, j))); } } readConsoleWhile(hazelcast, name, () -> { map .keySet() .forEach(k -> { Partition partition = hazelcast.getPartitionService().getPartition(k); show("key = %s, values = %s, partitionId = %d, owner = %s.", k, map.get(k), partition.getPartitionId(), partition.getOwner()); }); return null; }, map::size); }); }
同じく、3 Node起動後確認。
> size This data set size = 50. > all key = key1, values = [1, 2, 3, 4, 5], partitionId = 152, owner = Member [172.17.42.1]:5702. key = key2, values = [1, 2, 3, 4, 5], partitionId = 221, owner = Member [172.17.42.1]:5701 this. key = key5, values = [1, 2, 3, 4, 5], partitionId = 58, owner = Member [172.17.42.1]:5703. key = key6, values = [1, 2, 3, 4, 5], partitionId = 258, owner = Member [172.17.42.1]:5701 this. key = key3, values = [1, 2, 3, 4, 5], partitionId = 88, owner = Member [172.17.42.1]:5703. key = key4, values = [1, 2, 3, 4, 5], partitionId = 15, owner = Member [172.17.42.1]:5703. key = key9, values = [1, 2, 3, 4, 5], partitionId = 125, owner = Member [172.17.42.1]:5702. key = key7, values = [1, 2, 3, 4, 5], partitionId = 48, owner = Member [172.17.42.1]:5703. key = key8, values = [1, 2, 3, 4, 5], partitionId = 164, owner = Member [172.17.42.1]:5702. key = key10, values = [1, 2, 3, 4, 5], partitionId = 12, owner = Member [172.17.42.1]:5703.
MultiMapの場合は、MultiMap#sizeがキーに対応するエントリ数に加えて、どのくらいキーに紐づく値があるのかも合算されます。
2つ落として、確認。
> size This data set size = 40. > all key = key1, values = [1, 2, 3, 4, 5], partitionId = 152, owner = Member [172.17.42.1]:5701 this. key = key2, values = [1, 2, 3, 4, 5], partitionId = 221, owner = Member [172.17.42.1]:5701 this. key = key5, values = [1, 2, 3, 4, 5], partitionId = 58, owner = Member [172.17.42.1]:5701 this. key = key6, values = [1, 2, 3, 4, 5], partitionId = 258, owner = Member [172.17.42.1]:5701 this. key = key4, values = [1, 2, 3, 4, 5], partitionId = 15, owner = Member [172.17.42.1]:5701 this. key = key9, values = [1, 2, 3, 4, 5], partitionId = 125, owner = Member [172.17.42.1]:5701 this. key = key8, values = [1, 2, 3, 4, 5], partitionId = 164, owner = Member [172.17.42.1]:5701 this. key = key10, values = [1, 2, 3, 4, 5], partitionId = 12, owner = Member [172.17.42.1]:5701 this.
意外と残りましたが、やっぱり欠損はしましたね。
MultiMapの場合も、Partitionに紐づけるキーは、エントリのキーから作成します。
https://github.com/hazelcast/hazelcast/blob/v3.5.3/hazelcast/src/main/java/com/hazelcast/multimap/impl/ObjectMultiMapProxy.java#L107
https://github.com/hazelcast/hazelcast/blob/v3.5.3/hazelcast/src/main/java/com/hazelcast/multimap/impl/MultiMapProxySupport.java#L236
List
で、ここからはキーを持たないデータ構造。まずはListから。
@Override protected void execute(String... args) { withHazelcast(hazelcast -> { String name = "default"; IList<Integer> list = hazelcast.getList(name); if (args.length > 0) { if ("master".equals(args[0])) { IntStream.rangeClosed(1, 10).forEach(list::add); } } readConsoleWhile(hazelcast, name, () -> { list.forEach(e -> show("element = %d.", e)); return null; }, list::size); }); }
Nodeを3つ起動。
## Node 1 $ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.partitions.HazelcastListInter preter -Dexec.args=master ## Node 2 $ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.partitions.HazelcastListInter preter ## Node 3 $ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.partitions.HazelcastListInter preter
確認。
> size This data set size = 10. > all element = 1. element = 2. element = 3. element = 4. element = 5. element = 6. element = 7. element = 8. element = 9. element = 10.
この時、Listなどのために用意した、以下のコマンドを使ってみます。
case "name": PartitionService partitionService = hazelcast.getPartitionService(); SerializationService serializationService = ((HazelcastInstanceProxy)hazelcast).getSerializationService(); Data key = serializationService.toData(name, StringPartitioningStrategy.INSTANCE); Partition partition = partitionService.getPartition(key); show("Partition by name = %s, partitionId = %s, owner = %s.", name, partition.getPartitionId(), partition.getOwner()); break;
Node 2ですと。
> name Partition by name = default, partitionId = 103, owner = Member [172.17.42.1]:5702.
いろいろ試したところ、Node 2をダウンさせると、うちの環境ではこの値がNode 2⇒Node 1と移りやすかったので、Node 1とNode 2をまとめてkill。
$ kill 128965 129035
結果。
> size This data set size = 0. > all
空になりました。
同時にkillではなく、ひとつずつ落としていくとデータは残るのですが、このようにデータを持っているNodeを、バックアップNode含めて同時に狙い撃ちにすると、完全になくなるみたいです。
※バックアップのNodeの情報は普通には得られそうにないので、この方法だと関係ないNodeを落としてしまうことも…
というわけで、Listは単一のNodeに全データを持っているということになりそうですね(バックアップはありますが)。
なんでこのような挙動になっているかですが、HazelcastのList(とSet)の親クラスになっているAbstractCollectionProxyImplクラスの以下の部分で、PartitionIdが固定されているからだと思われます。
protected AbstractCollectionProxyImpl(String name, NodeEngine nodeEngine, S service) { super(nodeEngine, service); this.name = name; this.partitionId = nodeEngine.getPartitionService().getPartitionId(getNameAsPartitionAwareData()); }
このgetNameAsPartitionAwareDataというメソッドは、AbstractDistributedObjectクラスにて定義されています。
protected Data getNameAsPartitionAwareData() { String name = getName(); return getNodeEngine().getSerializationService().toData(name, PARTITIONING_STRATEGY); }
このnameは、分散オブジェクトの名前(今回のコード例でいくと、「default」)。PARTITIONING_STRATEGYというのは、AbstractDistributedObjectクラスに固定で定義されています。
protected static final PartitioningStrategy PARTITIONING_STRATEGY = StringPartitioningStrategy.INSTANCE;
というわけで、いつも同じPartitionを指すわけですね。
この部分を近い形に再現したのが、以下というわけでした。
case "name": PartitionService partitionService = hazelcast.getPartitionService(); SerializationService serializationService = ((HazelcastInstanceProxy)hazelcast).getSerializationService(); Data key = serializationService.toData(name, StringPartitioningStrategy.INSTANCE); Partition partition = partitionService.getPartition(key); show("Partition by name = %s, partitionId = %s, owner = %s.", name, partition.getPartitionId(), partition.getOwner()); break;
Set
Setは、Listと同じ構造(ProxyクラスがAbstractCollectionProxyImplクラスのサブクラス)なので、結果は端折ります。
コードは、こんな感じです。
@Override protected void execute(String... args) { withHazelcast(hazelcast -> { String name = "default"; ISet<Integer> set = hazelcast.getSet(name); if (args.length > 0) { if ("master".equals(args[0])) { IntStream.rangeClosed(1, 10).forEach(set::add); } } readConsoleWhile(hazelcast, name, () -> { set.forEach(e -> show("element = %d.", e)); return null; }, set::size); }); }
Queue
Queueは、ドキュメント上は特に何も書いていませんが、ListやSetと同じ気がします。
コードは、こちら。
@Override protected void execute(String... args) { withHazelcast(hazelcast -> { String name = "default"; IQueue<Integer> queue = hazelcast.getQueue(name); if (args.length > 0) { if ("master".equals(args[0])) { IntStream.rangeClosed(1, 10).forEach(i -> { try { queue.put(i); } catch (InterruptedException e) { } }); } } readConsoleWhile(hazelcast, name, () -> { queue.forEach(e -> show("element = %d.", e)); return null; }, queue::size); }); }
Nodeを3つ起動して、確認。
> size This data set size = 10. > all element = 1. element = 2. element = 3. element = 4. element = 5. element = 6. element = 7. element = 8. element = 9. element = 10.
2つ、落とします。
> size This data set size = 0. > all
バックアップNodeも含めて同時に落とすと、データがなくなりました…。
もちろん、順次落とした場合はこういうことになりませんが、ひとつのNodeでデータをまかなう構造みたいですね。
実装を見たところ、QueueはAbstractCollectionProxyImplクラスを継承しているわけではないのですが、やっていることは同じみたいです。
QueueProxySupport(final String name, final QueueService queueService, NodeEngine nodeEngine) { super(nodeEngine, queueService); this.name = name; this.partitionId = nodeEngine.getPartitionService().getPartitionId(getNameAsPartitionAwareData()); this.config = nodeEngine.getConfig().findQueueConfig(name); }
Partition Id固定、と。
というわけで、List、Setと同じようにひとつのNodeのメモリサイズで頑張りましょうということになりそうですね。
Cache
オマケで、Mapと似た存在のCacheも入れてみました。
Cacheを使う場合は、JCacheへの依存関係が必要になります。
<dependency> <groupId>javax.cache</groupId> <artifactId>cache-api</artifactId> <version>1.0.0</version> </dependency>
コードについては、HazelcastInstanceから直接Cacheを取得できない感じなので、JCacheのAPIからunwrapして引っこ抜きます。
@Override protected void execute(String... args) { String name = "default"; Configuration<String, Integer> configuration = new MutableConfiguration<String, Integer>() .setTypes(String.class, Integer.class); try (CachingProvider cachingProvider = Caching.getCachingProvider(); CacheManager cacheManager = cachingProvider.getCacheManager(); Cache<String, Integer> cache = cacheManager.createCache(name, configuration)) { HazelcastCacheManager hazelcastCacheManager = cacheManager.unwrap(HazelcastServerCacheManager.class); HazelcastInstance hazelcast = hazelcastCacheManager.getHazelcastInstance(); ICache<String, Integer> hazelcastCache = cache.unwrap(ICache.class); if (args.length > 0) { if ("master".equals(args[0])) { IntStream.rangeClosed(1, 10).forEach(i -> hazelcastCache.put("key" + i, i)); } } readConsoleWhile(hazelcast, name, () -> { StreamSupport.stream(hazelcastCache.spliterator(), false) .forEach(entry -> { String k = entry.getKey(); Partition partition = hazelcast.getPartitionService().getPartition(k); show("key = %s, partitionId = %d, owner = %s.", k, partition.getPartitionId(), partition.getOwner()); }); return null; }, hazelcastCache::size); } }
結果は、Mapと一緒で落とす前は
> size This data set size = 10. > all key = key10, partitionId = 12, owner = Member [172.17.42.1]:5703. key = key4, partitionId = 15, owner = Member [172.17.42.1]:5703. key = key7, partitionId = 48, owner = Member [172.17.42.1]:5702. key = key5, partitionId = 58, owner = Member [172.17.42.1]:5702. key = key3, partitionId = 88, owner = Member [172.17.42.1]:5702. key = key9, partitionId = 125, owner = Member [172.17.42.1]:5702. key = key1, partitionId = 152, owner = Member [172.17.42.1]:5703. key = key8, partitionId = 164, owner = Member [172.17.42.1]:5703. key = key2, partitionId = 221, owner = Member [172.17.42.1]:5701 this. key = key6, partitionId = 258, owner = Member [172.17.42.1]:5701 this.
だったのが、2つ同時に落とすと
> size This data set size = 5. > all key = key10, partitionId = 12, owner = Member [172.17.42.1]:5701 this. key = key4, partitionId = 15, owner = Member [172.17.42.1]:5701 this. key = key9, partitionId = 125, owner = Member [172.17.42.1]:5701 this. key = key2, partitionId = 221, owner = Member [172.17.42.1]:5701 this. key = key6, partitionId = 258, owner = Member [172.17.42.1]:5701 this.
データが一部欠損します、と。
まとめ
Hazelcastのデータ構造のうち、(TopicsとRingbufferは確認していないですが)ListやSet、Queueなど一部のデータ構造はスケールしない=ひとつのNodeに全部格納されるという挙動を確認しました。
ドキュメントに記載はありますし、キーをもとに分散しているので、そりゃあそうなるよねぇという気も。こういうの見てると、KVSって感じがしますね。
ソースコードもそこそこ追いましたし、よい勉強になりました。
今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/hazelcast-examples/tree/master/hazelcast-collection-partitions