CLOVER🍀

That was when it all began.

JCache(Hazelcast)のEntryProcessorで分散処理

HazelcastのJCacheのEntryProcessorの部分の実装を見ていて、これって分散実行されそうだなーと思って試したら、案の定だったのでその結果を書いておきます。

https://github.com/hazelcast/hazelcast/blob/v3.3.2/hazelcast/src/main/java/com/hazelcast/cache/impl/CacheProxy.java#L219

JSR-107を読んでみても、「実装がリモートまたは分散キャッシュをサポートする場合は、EntryProcessorはリモートで実行されるよ」的なことが書いていました。なので、これができるかどうかは実装次第ですね。

では、ちょっと書いてみます。

まずは単一Nodeで

Maven依存関係の定義。

    <dependency>
      <groupId>javax.cache</groupId>
      <artifactId>cache-api</artifactId>
      <version>1.0.0</version>
    </dependency>
    <dependency>
      <groupId>com.hazelcast</groupId>
      <artifactId>hazelcast</artifactId>
      <version>3.3.2</version>
    </dependency>

EntryProcessorのエントリポイントとなるクラスには、以下のimport文などがあるとします。
src/main/java/org/littlewings/hazelcast/jcache/EntryProcessorStarter.java

package org.littlewings.hazelcast.jcache;

import java.io.Serializable;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.IntStream;

import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
import javax.cache.spi.CachingProvider;

コード本体。

public class EntryProcessorStarter {
    public static void main(String[] args) {
        try (CachingProvider provider = Caching.getCachingProvider();
             CacheManager manager = provider.getCacheManager();
             Cache<String, Integer> cache = manager.createCache("entryProcessorCache",
                                                                new MutableConfiguration<>())) {
            IntStream
                .rangeClosed(1, 20)
                .forEach(i -> cache.put("key" + i, i));

            Set<String> keys = new HashSet<>();
            cache.forEach(entry -> keys.add(entry.getKey()));

            Map<String, EntryProcessorResult<Integer>> results =
                cache.invokeAll(keys, (entry, arguments) -> {
                    System.out.printf("[%s] key = %s%n", Thread.currentThread().getName(), entry.getKey());

                    if (entry.exists()) {
                        return entry.getValue() * 2;
                    } else {
                        return 0;
                    }
                });

            int result = results.entrySet()
                .stream()
                .mapToInt(e -> e.getValue().get())
                .sum();
            
            System.out.printf("Result = %d%n", result);
        }
    }
}

最初にエントリを20個入れて

            IntStream
                .rangeClosed(1, 20)
                .forEach(i -> cache.put("key" + i, i));

ラムダ式で単純に値を2倍するだけのEntryProcessorです。

            Map<String, EntryProcessorResult<Integer>> results =
                cache.invokeAll(keys, (entry, arguments) -> {
                    System.out.printf("[%s] key = %s%n", Thread.currentThread().getName(), entry.getKey());

                    if (entry.exists()) {
                        return entry.getValue() * 2;
                    } else {
                        return 0;
                    }
                });

最後は集計。

            int result = results.entrySet()
                .stream()
                .mapToInt(e -> e.getValue().get())
                .sum();

あまり、意味のある例ではないですが…。

maven-shade-pluginでJARファイルにしたとして、以下のコマンドで上記のクラスを実行して、EntryProcessorを起動します。

$ java -cp target/hazelcast-jcache-0.0.1-SNAPSHOT.jar org.littlewings.hazelcast.jcache.EntryProcessorStarter

EntryProcessor内で、スレッド名とキーを出力しているのですが、Hazelcastの場合は単一Nodeでも複数スレッドで実行されているみたいですね。

[hz.CacheProvider.partition-operation.thread-3] key = key1
[hz.CacheProvider.partition-operation.thread-1] key = key2
[hz.CacheProvider.partition-operation.thread-2] key = key5
[hz.CacheProvider.partition-operation.thread-3] key = key6
[hz.CacheProvider.partition-operation.thread-3] key = key3
[hz.CacheProvider.partition-operation.thread-3] key = key4
[hz.CacheProvider.partition-operation.thread-1] key = key9
[hz.CacheProvider.partition-operation.thread-0] key = key7
[hz.CacheProvider.partition-operation.thread-1] key = key8
[hz.CacheProvider.partition-operation.thread-2] key = key11
[hz.CacheProvider.partition-operation.thread-3] key = key10
[hz.CacheProvider.partition-operation.thread-0] key = key20
[hz.CacheProvider.partition-operation.thread-2] key = key15
[hz.CacheProvider.partition-operation.thread-0] key = key14
[hz.CacheProvider.partition-operation.thread-0] key = key13
[hz.CacheProvider.partition-operation.thread-0] key = key12
[hz.CacheProvider.partition-operation.thread-1] key = key19
[hz.CacheProvider.partition-operation.thread-1] key = key18
[hz.CacheProvider.partition-operation.thread-2] key = key17
[hz.CacheProvider.partition-operation.thread-3] key = key16

結果。

Result = 420

本当に、大したことない例ですね。

分散実行

それでは、今度は複数Nodeで実行してみましょう。分散処理用に浮いていてもらうNodeとして、以下のようなクラスを用意しました。
src/main/java/org/littlewings/hazelcast/jcache/WaitServer.java

package org.littlewings.hazelcast.jcache;

import java.util.stream.IntStream;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.spi.CachingProvider;

public class WaitServer {
    public static void main(String[] args) {
        try (CachingProvider provider = Caching.getCachingProvider();
             CacheManager manager = provider.getCacheManager();
             Cache<String, Integer> cache = manager.createCache("entryProcessorCache",
                                                                new MutableConfiguration<>())) {
            IntStream
                .rangeClosed(1, 20)
                .forEach(i -> cache.put("key" + i, i));

            LocalDateTime now  = LocalDateTime.now();

            System.out.printf("[%s] Wait Cache Server startup.", now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
            System.console().readLine();
        }
    }
}

データの登録は、こちらで行うようにしました。あと、起動後にEnter打つと終了しますー。

EntryProcessorの起動側の方からは、データの登録はコメントアウトしてしまいましょう。

public class EntryProcessorStarter {
    public static void main(String[] args) {
        try (CachingProvider provider = Caching.getCachingProvider();
             CacheManager manager = provider.getCacheManager();
             Cache<String, Integer> cache = manager.createCache("entryProcessorCache",
                                                                new MutableConfiguration<>())) {
            /*
            IntStream
                .rangeClosed(1, 20)
                .forEach(i -> cache.put("key" + i, i));
            */

mvn packageして、浮いていてもらうNodeを2つ起動します。

## Node1
$ java -cp target/halcast-jcache-0.0.1-SNAPSHOT.jar org.littlewings.hazelcast.jcache.WaitServer

## Node2
$ java -cp target/halcast-jcache-0.0.1-SNAPSHOT.jar org.littlewings.hazelcast.jcache.WaitServer

ここでNode1とか2とか言っているのは、便宜上の名前です。

少し待っていると、Hazelcastが勝手にクラスタを構成します。

## Node1
Members [2] {
	Member [192.168.129.129]:5701 this
	Member [192.168.129.129]:5702
}

## Node2
Members [2] {
	Member [192.168.129.129]:5701
	Member [192.168.129.129]:5702 this
}

これは、Hazelcastのデフォルトの設定で動作させています。

続いて、EntryProcessorのエントリポイントを実行。

$ java -cp target/hazelcast-jcache-0.0.1-SNAPSHOT.jar org.littlewings.hazelcast.jcache.EntryProcessorStarter

すると、とりあえずクラスタに参加して

Members [3] {
	Member [192.168.129.129]:5701
	Member [192.168.129.129]:5702
	Member [192.168.129.129]:5703 this
}

微妙に頑張ろうとするのですが、

[hz.CacheProvider.partition-operation.thread-1] key = key2
[hz.CacheProvider.partition-operation.thread-3] key = key6
[hz.CacheProvider.partition-operation.thread-1] key = key9
[hz.CacheProvider.partition-operation.thread-3] key = key10
[hz.CacheProvider.partition-operation.thread-0] key = key14
[hz.CacheProvider.partition-operation.thread-2] key = key17

最終的に失敗します。

情報: [192.168.129.129]:5703 [dev] [3.3.2] Address[192.168.129.129]:5703 is SHUTDOWN
Exception in thread "main" javax.cache.processor.EntryProcessorException: javax.cache.processor.EntryProcessorException: com.hazelcast.nio.serialization.HazelcastSerializationException: There is no suitable serializer for class org.littlewings.hazelcast.jcache.EntryProcessorStarter$$Lambda$3/1961176822

これは、リモート実行する時にEntryProcessorがシリアライズできないからですね。

JSR-107を見ても、EntryProcessorや引数、戻り値はSerializableであることがマナーだよって書かれてありますし…。ラムダ式でやるのは、ホントはよくないようですねー。

というわけで、EntryProcessorを独立して定義します。今回は、staticなインナークラスにしました。Serializableを付けて。

    public static class MyEntryProcessor implements EntryProcessor<String, Integer, Integer>, Serializable {
        @Override
        public Integer process(MutableEntry<String, Integer> entry, Object... arguments) {
            System.out.printf("[%s] key = %s%n", Thread.currentThread().getName(), entry.getKey());

            if (entry.exists()) {
                return entry.getValue() * 2;
            } else {
                return 0;
            }
        }
    }

EntryProcessorの実行部はこのように置き換えます。

            Map<String, EntryProcessorResult<Integer>> results =
                cache.invokeAll(keys, new MyEntryProcessor());

では、他のNode2つをもう1度起動しておいて、再度EntryProcessorを実行します。

$ java -cp target/hazelcast-jcache-0.0.1-SNAPSHOT.jar org.littlewings.hazelcast.jcache.EntryProcessorStarter

クラスタに参加して

Members [3] {
	Member [192.168.129.129]:5701
	Member [192.168.129.129]:5702
	Member [192.168.129.129]:5703 this
}

それぞれ、こんな出力がNodeごとに表示されます。

## Node1
[hz.CacheProvider.partition-operation.thread-3] key = key3
[hz.CacheProvider.partition-operation.thread-0] key = key7
[hz.CacheProvider.partition-operation.thread-2] key = key11
[hz.CacheProvider.partition-operation.thread-1] key = key19

## Node2
[hz.CacheProvider.partition-operation.thread-3] key = key1
[hz.CacheProvider.partition-operation.thread-3] key = key4
[hz.CacheProvider.partition-operation.thread-1] key = key8
[hz.CacheProvider.partition-operation.thread-0] key = key20
[hz.CacheProvider.partition-operation.thread-0] key = key12
[hz.CacheProvider.partition-operation.thread-1] key = key18
[hz.CacheProvider.partition-operation.thread-3] key = key16

## EntryProcessorの実行側
[hz.CacheProvider.partition-operation.thread-1] key = key2
[hz.CacheProvider.partition-operation.thread-2] key = key5
[hz.CacheProvider.partition-operation.thread-3] key = key6
[hz.CacheProvider.partition-operation.thread-1] key = key9
[hz.CacheProvider.partition-operation.thread-3] key = key10
[hz.CacheProvider.partition-operation.thread-2] key = key15
[hz.CacheProvider.partition-operation.thread-0] key = key14
[hz.CacheProvider.partition-operation.thread-0] key = key13
[hz.CacheProvider.partition-operation.thread-2] key = key17

で、ちゃんと結果が呼び出し元で得られます。

Result = 420

分散実行できましたね!

com.hazelcast.cache.BackupAwareEntryProcessor

Hazelcastは、バックアップ用のNode向けに、EntryProcessorを拡張したBackupAwareEntryProcessorを提供する予定のようです。3.3.2の時点ではまだ存在していないみたいですが、3.4向けにはすでに存在しており、ドキュメントへの記載も行われています。

というか、Hazelcast自身が持っていたEntryProcessorも、そんな感じの構成でしたね。

Entry Processor
http://docs.hazelcast.org/docs/3.3/manual/html-single/hazelcast-documentation.html#entry-processor

GitHubの更新状態を見ていると、JCacheの部分は頻繁に更新されていますし、Cacheの設定等もできるようになるようなので、バージョンアップでさらに変わるでしょうね。

ちょこちょことチェックしていこうと思います。

最後に、今回作成したコードですが、こちらに置いています。
https://github.com/kazuhira-r/hazelcast-examples/tree/master/hazelcast-jcache-entryprocessor

ところで、EntryProcessorって事前に対象のキーを知らないといけないんですけど、こういうのってどうやって皆さん活用してるのかなぁ…。今回の例みたいに、最初にキーを集めてたら微妙なのでは。

Map/Reduceの方を使うのかな。

それとも、あらかじめ処理対象のキーが分かっている時に活用することが多いのかな。

追記
今回も、書いたら速攻で開発者の方にコメントをいただきました。

新しいドキュメントのことと

BackupAwareEntryProcessorについて

あと、自分がこうやって日本語でHazelcastのエントリを書いていることを、歓迎してくれているようです。

これは、嬉しいですねー。