CLOVER🍀

That was when it all began.

HazelcastのReplicated Mapを試してみる - その2

先ほど、Hazelcast 3.3から追加された、Replicated Mapの触り的なエントリを書きました。

HazelcastのReplicated Mapを試してみる - その1
http://d.hatena.ne.jp/Kazuhira/20141012/1413104805

これに続いて、もう少し設定に踏み込んだ内容を書きたいと思います。

Maven依存関係には、先ほど同様に以下が定義されているものとします。

    <dependency>
      <groupId>com.hazelcast</groupId>
      <artifactId>hazelcast</artifactId>
      <version>3.3.1</version>
    </dependency>

Replicated Mapに設定するのは、以下の項目になります。

項目名 意味 デフォルト値
async-fillup クラスタに新しいNodeが参加した時に、他のNodeが持つエントリのレプリケートを非同期にするかどうか。デフォルトは非同期(true)。非同期の場合、レプリケートできていないエントリへのアクセスはnullを返却し、存在もしていないことになる。またこの場合の書き込みはVector Clockの初期化が終わっていないケースにはロストすることがある。falseにすると、レプリケートが終了するまでブロックする true
replication-delay-millis レプリケーションを開始する遅延時間を設定する。これにより、複数の更新の際のオーバーヘッドを最少にする。また、OutOfMemoryError回避のため、レプリケーションには1000のハードリミットが設定されている。この遅延を0ミリ秒に設定することもできるが、レプリケーションの遅延とオーバーヘッドのトレードオフとなる 100
concurrency-level Replicated Mapの、ミューテックスとセグメントの数を定義するのに使用される。これにより、キーのhashCodeの計算結果によるミューテックス/セグメントの選択を調整し、並行性のレベルを変更する 32
in-memory-format Replicated Map中で、エントリをどのように保持するかを設定する。OBJECTかBINARYが設定可能。OBJECTでは、デシリアライズされた状態で保持され、レプリケーションも高速である。注意点は、Map#putしないと変更は他のNodeに伝播しない。BINARYはシリアライズされた状態で保存され、リクエストごとにデシリアライズされる。より高いデータのカプセル化を提供する OBJECT

とまあ、こんな感じ…。あとはstatistics-enabledとentry-listenersがありますが、端折ります。

in-memory-formatは、Distributed MapのデフォルトはBINARYですが、Replicated MapだとOBJECTなんですね。

ちょっと確認してみる

とまあ、こんな設定を見ながら一部設定を確認してみるサンプルを書いてみました。

ベースになっているのは、先のエントリで書いたサンプルです。
src/main/java/org/littlewings/hazelcast/replicatedmap/HazelcastReplicatedMapExample.java

package org.littlewings.hazelcast.replicatedmap;

import java.time.LocalDateTime;
import java.time.Duration;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import com.hazelcast.config.ClasspathXmlConfig;
import com.hazelcast.config.Config;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.core.MapEvent;
import com.hazelcast.core.ReplicatedMap;

public class HazelcastReplicatedMapExample {
    public static void main(String[] args) {
        String mapName;
        int entrySize;
        if (args.length > 1) {
            mapName = args[0];
            entrySize = Integer.decode(args[1]);
        } else {
            mapName = "default";
            entrySize = 20;
        }

        Instant start = Instant.now();

        HazelcastInstance hazelcast =
            Hazelcast.newHazelcastInstance(new ClasspathXmlConfig("hazelcast.xml"));

        Map<String, String> map = hazelcast.getReplicatedMap(mapName);
        // Map<String, String> map = hazelcast.getMap(mapName);

        if (map instanceof ReplicatedMap) {
            ReplicatedMap<String, String> rmap = (ReplicatedMap<String, String>)map;
            rmap.addEntryListener(new MyEntryListener());
        } else if (map instanceof IMap) {
            IMap<String, String> rmap = (IMap<String, String>)map;
            rmap.addEntryListener(new MyEntryListener(), true);
        }

        String lastKey = "key" + entrySize;
        System.out.printf("last-key = [%s], value = [%s], exists = [%b]%n",
                          lastKey,
                          map.get("key" + entrySize),
                          map.containsKey("key" + entrySize));

        Instant createdTime = Instant.now();
        Duration timeElapsed = Duration.between(start, createdTime);
        System.out.printf("Cluster Joined Time = [%d] millis%n", timeElapsed.toMillis());

        IntStream
            .rangeClosed(1, entrySize)
            .forEach(i -> map.put("key" + i, "value" + i));

        System.out.printf("[%s] Hazelcast Node, startup, putted [%d]entries.%n",
                          LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), entrySize);
        System.console().readLine("Enter, shutdown...");

        IntStream
            .rangeClosed(1, entrySize)
            .forEach(i -> {
                String key = "key" + i;
                String value = map.get(key);
                System.out.printf("Key = [%s], Value = [%s]%n", key, value);
            });

        hazelcast.getLifecycleService();
        Hazelcast.shutdownAll();
    }

    public static class MyEntryListener implements EntryListener<String, String> {
        @Override
        public void entryAdded(EntryEvent<String, String> event) {
            // System.out.println("Entry added: " + event);
        }

        @Override
        public void entryUpdated(EntryEvent<String, String> event) {
            // System.out.println("Entry updated: " + event);
        }

        @Override
        public void entryRemoved(EntryEvent<String, String> event) {
            // System.out.println("Entry removed: " + event);
        }

        @Override
        public void entryEvicted(EntryEvent<String, String> event) {
            // Currently not supported, will never fire
        }

        @Override
        public void mapCleared(MapEvent event) {
        }

        @Override
        public void mapEvicted(MapEvent event) {
        }
    }
}

比較のために、Distributed Mapを含めていますが、こちらはコメントアウトして今回使用していません。

起動引数に、Mapの名前と作成するエントリ数を受けることにしています。

        String mapName;
        int entrySize;
        if (args.length > 1) {
            mapName = args[0];
            entrySize = Integer.decode(args[1]);
        } else {
            mapName = "default";
            entrySize = 20;
        }

Listenerの設定と、最終キーとそれに紐付く値が取得できるか、確認するコード。

        Map<String, String> map = hazelcast.getReplicatedMap(mapName);
        // Map<String, String> map = hazelcast.getMap(mapName);

        if (map instanceof ReplicatedMap) {
            ReplicatedMap<String, String> rmap = (ReplicatedMap<String, String>)map;
            rmap.addEntryListener(new MyEntryListener());
        } else if (map instanceof IMap) {
            IMap<String, String> rmap = (IMap<String, String>)map;
            rmap.addEntryListener(new MyEntryListener(), true);
        }

        String lastKey = "key" + entrySize;
        System.out.printf("last-key = [%s], value = [%s], exists = [%b]%n",
                          lastKey,
                          map.get("key" + entrySize),
                          map.containsKey("key" + entrySize));

Listenerは、add、update、removeをサポートしていて、evictionはサポートしていないそうです。

サンプルではSystem.outするものですが、実際に書き出すと大量に出力されるので、コメントアウトしました…。

Replicated Mapで使用するListenerの注意点は、イベントを発生させたNodeでのみイベントが発火する、ということです。Distributed Mapの場合は、イベントを発生させたNodeとバックアップNodeでイベントが発火するので、ここは動作が異なります。

あとは、設定ファイルを用意。
src/main/resources/hazelcast.xml

<?xml version="1.0" encoding="UTF-8"?>
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.3.xsd"
           xmlns="http://www.hazelcast.com/schema/config"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <replicatedmap name="default">
    <!-- replicatedmap default -->
    <in-memory-format>OBJECT</in-memory-format>
    <concurrency-level>32</concurrency-level>
    <replication-delay-millis>100</replication-delay-millis>
    <async-fillup>true</async-fillup>
  </replicatedmap>

  <replicatedmap name="sync">
    <in-memory-format>OBJECT</in-memory-format>
    <concurrency-level>32</concurrency-level>
    <replication-delay-millis>0</replication-delay-millis>
    <async-fillup>false</async-fillup>
  </replicatedmap>
</hazelcast>

名前が「default」のものと、「sync」のものを用意しました。「sync」では、async-fillupをfalseにして「replication-delay-millis」を0ミリ秒にしています。

あと実行ですが、mvn exec:javaが面倒だったのでmaven-shade-pluginでひとつのJARにしました。

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>2.3</version>
        <configuration>
          <transformers>
            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
              <mainClass>org.littlewings.hazelcast.replicatedmap.HazelcastReplicatedMapExample</mainClass>
            </transformer>
          </transformers>
        </configuration>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

それでは、試してみます。ここでは、async-fillupの動作を確認してみましょう。

まず「default」のReplicated Mapで、エントリ数100000でひとつNodeを起動します。

$ java -Xmx1g -jatarget/hazelcast-replicated-map-0.0.1-SNAPSHOT.jar default 100000

ひとつめ、起動しました。

last-key = [key100000], value = [null], exists = [false]
Cluster Joined Time = [6396] millis
[2014-10-12 17:51:49] Hazelcast Node, startup, putted [100000]entries.
Enter, shutdown...

最初のNodeなので、当然最終キーに紐付く値はありません。

ここで、もうひとつNodeを起動。

$ java -Xmx1g -jar target/hazelcast-replicated-map-0.0.1-SNAPSHOT.jar default 100000

こちらでは、こういう表示になります。

last-key = [key100000], value = [null], exists = [false]
Cluster Joined Time = [10650] millis
[2014-10-12 17:52:37] Hazelcast Node, startup, putted [100000]entries.
Enter, shutdown...

最終キーに紐付く値がnullで、存在しないことになっています。また起動し終わったメッセージが出るまで(エントリを登録しきるまで)、ちょっと時間がかかりました。

次に、起動引数を変えて、Map名を「sync」にしてみます。これで、async-fillupが無効になり、レプリケーションの遅延も極小になるはず…。

$ java -Xmx1g -jatarget/hazelcast-replicated-map-0.0.1-SNAPSHOT.jar sync 100000

2つ目のNodeも起動。

$ java -Xmx1g -jatarget/hazelcast-replicated-map-0.0.1-SNAPSHOT.jar sync 100000

こちらでは、最終キーが取得可能に。

last-key = [key100000], value = [value100000], exists = [true]
Cluster Joined Time = [11093] millis
[2014-10-12 17:56:26] Hazelcast Node, startup, putted [100000]entries.
Enter, shutdown...

なんですけど、たまに起動時に取得できない時もあったり…なんか間違ってる??
まあ、async-fillupがtrueの時は、ほぼ取得できませんでしたので、動作は変わっているとは思いますが。

どちらにせよ、「weakly consistent」と何度もドキュメント中に書いているので、そのあたりは意識して使うんだろうなぁとは思います。

今回は、この辺で。

作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/hazelcast-examples/tree/master/hazelcast-replicated-map