HazelcastのJCacheのEntryProcessorの部分の実装を見ていて、これって分散実行されそうだなーと思って試したら、案の定だったのでその結果を書いておきます。
JSR-107を読んでみても、「実装がリモートまたは分散キャッシュをサポートする場合は、EntryProcessorはリモートで実行されるよ」的なことが書いていました。なので、これができるかどうかは実装次第ですね。
では、ちょっと書いてみます。
まずは単一Nodeで
Maven依存関係の定義。
<dependency> <groupId>javax.cache</groupId> <artifactId>cache-api</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>com.hazelcast</groupId> <artifactId>hazelcast</artifactId> <version>3.3.2</version> </dependency>
EntryProcessorのエントリポイントとなるクラスには、以下のimport文などがあるとします。
src/main/java/org/littlewings/hazelcast/jcache/EntryProcessorStarter.java
package org.littlewings.hazelcast.jcache; import java.io.Serializable; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.stream.IntStream; import javax.cache.Cache; import javax.cache.CacheManager; import javax.cache.Caching; import javax.cache.configuration.MutableConfiguration; import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorResult; import javax.cache.processor.MutableEntry; import javax.cache.spi.CachingProvider;
コード本体。
public class EntryProcessorStarter { public static void main(String[] args) { try (CachingProvider provider = Caching.getCachingProvider(); CacheManager manager = provider.getCacheManager(); Cache<String, Integer> cache = manager.createCache("entryProcessorCache", new MutableConfiguration<>())) { IntStream .rangeClosed(1, 20) .forEach(i -> cache.put("key" + i, i)); Set<String> keys = new HashSet<>(); cache.forEach(entry -> keys.add(entry.getKey())); Map<String, EntryProcessorResult<Integer>> results = cache.invokeAll(keys, (entry, arguments) -> { System.out.printf("[%s] key = %s%n", Thread.currentThread().getName(), entry.getKey()); if (entry.exists()) { return entry.getValue() * 2; } else { return 0; } }); int result = results.entrySet() .stream() .mapToInt(e -> e.getValue().get()) .sum(); System.out.printf("Result = %d%n", result); } } }
最初にエントリを20個入れて
IntStream .rangeClosed(1, 20) .forEach(i -> cache.put("key" + i, i));
ラムダ式で単純に値を2倍するだけのEntryProcessorです。
Map<String, EntryProcessorResult<Integer>> results = cache.invokeAll(keys, (entry, arguments) -> { System.out.printf("[%s] key = %s%n", Thread.currentThread().getName(), entry.getKey()); if (entry.exists()) { return entry.getValue() * 2; } else { return 0; } });
最後は集計。
int result = results.entrySet() .stream() .mapToInt(e -> e.getValue().get()) .sum();
あまり、意味のある例ではないですが…。
maven-shade-pluginでJARファイルにしたとして、以下のコマンドで上記のクラスを実行して、EntryProcessorを起動します。
$ java -cp target/hazelcast-jcache-0.0.1-SNAPSHOT.jar org.littlewings.hazelcast.jcache.EntryProcessorStarter
EntryProcessor内で、スレッド名とキーを出力しているのですが、Hazelcastの場合は単一Nodeでも複数スレッドで実行されているみたいですね。
[hz.CacheProvider.partition-operation.thread-3] key = key1 [hz.CacheProvider.partition-operation.thread-1] key = key2 [hz.CacheProvider.partition-operation.thread-2] key = key5 [hz.CacheProvider.partition-operation.thread-3] key = key6 [hz.CacheProvider.partition-operation.thread-3] key = key3 [hz.CacheProvider.partition-operation.thread-3] key = key4 [hz.CacheProvider.partition-operation.thread-1] key = key9 [hz.CacheProvider.partition-operation.thread-0] key = key7 [hz.CacheProvider.partition-operation.thread-1] key = key8 [hz.CacheProvider.partition-operation.thread-2] key = key11 [hz.CacheProvider.partition-operation.thread-3] key = key10 [hz.CacheProvider.partition-operation.thread-0] key = key20 [hz.CacheProvider.partition-operation.thread-2] key = key15 [hz.CacheProvider.partition-operation.thread-0] key = key14 [hz.CacheProvider.partition-operation.thread-0] key = key13 [hz.CacheProvider.partition-operation.thread-0] key = key12 [hz.CacheProvider.partition-operation.thread-1] key = key19 [hz.CacheProvider.partition-operation.thread-1] key = key18 [hz.CacheProvider.partition-operation.thread-2] key = key17 [hz.CacheProvider.partition-operation.thread-3] key = key16
結果。
Result = 420
本当に、大したことない例ですね。
分散実行
それでは、今度は複数Nodeで実行してみましょう。分散処理用に浮いていてもらうNodeとして、以下のようなクラスを用意しました。
src/main/java/org/littlewings/hazelcast/jcache/WaitServer.java
package org.littlewings.hazelcast.jcache; import java.util.stream.IntStream; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import javax.cache.Cache; import javax.cache.CacheManager; import javax.cache.Caching; import javax.cache.configuration.MutableConfiguration; import javax.cache.spi.CachingProvider; public class WaitServer { public static void main(String[] args) { try (CachingProvider provider = Caching.getCachingProvider(); CacheManager manager = provider.getCacheManager(); Cache<String, Integer> cache = manager.createCache("entryProcessorCache", new MutableConfiguration<>())) { IntStream .rangeClosed(1, 20) .forEach(i -> cache.put("key" + i, i)); LocalDateTime now = LocalDateTime.now(); System.out.printf("[%s] Wait Cache Server startup.", now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); System.console().readLine(); } } }
データの登録は、こちらで行うようにしました。あと、起動後にEnter打つと終了しますー。
EntryProcessorの起動側の方からは、データの登録はコメントアウトしてしまいましょう。
public class EntryProcessorStarter { public static void main(String[] args) { try (CachingProvider provider = Caching.getCachingProvider(); CacheManager manager = provider.getCacheManager(); Cache<String, Integer> cache = manager.createCache("entryProcessorCache", new MutableConfiguration<>())) { /* IntStream .rangeClosed(1, 20) .forEach(i -> cache.put("key" + i, i)); */
mvn packageして、浮いていてもらうNodeを2つ起動します。
## Node1 $ java -cp target/halcast-jcache-0.0.1-SNAPSHOT.jar org.littlewings.hazelcast.jcache.WaitServer ## Node2 $ java -cp target/halcast-jcache-0.0.1-SNAPSHOT.jar org.littlewings.hazelcast.jcache.WaitServer
ここでNode1とか2とか言っているのは、便宜上の名前です。
少し待っていると、Hazelcastが勝手にクラスタを構成します。
## Node1 Members [2] { Member [192.168.129.129]:5701 this Member [192.168.129.129]:5702 } ## Node2 Members [2] { Member [192.168.129.129]:5701 Member [192.168.129.129]:5702 this }
これは、Hazelcastのデフォルトの設定で動作させています。
続いて、EntryProcessorのエントリポイントを実行。
$ java -cp target/hazelcast-jcache-0.0.1-SNAPSHOT.jar org.littlewings.hazelcast.jcache.EntryProcessorStarter
すると、とりあえずクラスタに参加して
Members [3] { Member [192.168.129.129]:5701 Member [192.168.129.129]:5702 Member [192.168.129.129]:5703 this }
微妙に頑張ろうとするのですが、
[hz.CacheProvider.partition-operation.thread-1] key = key2 [hz.CacheProvider.partition-operation.thread-3] key = key6 [hz.CacheProvider.partition-operation.thread-1] key = key9 [hz.CacheProvider.partition-operation.thread-3] key = key10 [hz.CacheProvider.partition-operation.thread-0] key = key14 [hz.CacheProvider.partition-operation.thread-2] key = key17
最終的に失敗します。
情報: [192.168.129.129]:5703 [dev] [3.3.2] Address[192.168.129.129]:5703 is SHUTDOWN Exception in thread "main" javax.cache.processor.EntryProcessorException: javax.cache.processor.EntryProcessorException: com.hazelcast.nio.serialization.HazelcastSerializationException: There is no suitable serializer for class org.littlewings.hazelcast.jcache.EntryProcessorStarter$$Lambda$3/1961176822
これは、リモート実行する時にEntryProcessorがシリアライズできないからですね。
JSR-107を見ても、EntryProcessorや引数、戻り値はSerializableであることがマナーだよって書かれてありますし…。ラムダ式でやるのは、ホントはよくないようですねー。
というわけで、EntryProcessorを独立して定義します。今回は、staticなインナークラスにしました。Serializableを付けて。
public static class MyEntryProcessor implements EntryProcessor<String, Integer, Integer>, Serializable { @Override public Integer process(MutableEntry<String, Integer> entry, Object... arguments) { System.out.printf("[%s] key = %s%n", Thread.currentThread().getName(), entry.getKey()); if (entry.exists()) { return entry.getValue() * 2; } else { return 0; } } }
EntryProcessorの実行部はこのように置き換えます。
Map<String, EntryProcessorResult<Integer>> results =
cache.invokeAll(keys, new MyEntryProcessor());
では、他のNode2つをもう1度起動しておいて、再度EntryProcessorを実行します。
$ java -cp target/hazelcast-jcache-0.0.1-SNAPSHOT.jar org.littlewings.hazelcast.jcache.EntryProcessorStarter
クラスタに参加して
Members [3] { Member [192.168.129.129]:5701 Member [192.168.129.129]:5702 Member [192.168.129.129]:5703 this }
それぞれ、こんな出力がNodeごとに表示されます。
## Node1 [hz.CacheProvider.partition-operation.thread-3] key = key3 [hz.CacheProvider.partition-operation.thread-0] key = key7 [hz.CacheProvider.partition-operation.thread-2] key = key11 [hz.CacheProvider.partition-operation.thread-1] key = key19 ## Node2 [hz.CacheProvider.partition-operation.thread-3] key = key1 [hz.CacheProvider.partition-operation.thread-3] key = key4 [hz.CacheProvider.partition-operation.thread-1] key = key8 [hz.CacheProvider.partition-operation.thread-0] key = key20 [hz.CacheProvider.partition-operation.thread-0] key = key12 [hz.CacheProvider.partition-operation.thread-1] key = key18 [hz.CacheProvider.partition-operation.thread-3] key = key16 ## EntryProcessorの実行側 [hz.CacheProvider.partition-operation.thread-1] key = key2 [hz.CacheProvider.partition-operation.thread-2] key = key5 [hz.CacheProvider.partition-operation.thread-3] key = key6 [hz.CacheProvider.partition-operation.thread-1] key = key9 [hz.CacheProvider.partition-operation.thread-3] key = key10 [hz.CacheProvider.partition-operation.thread-2] key = key15 [hz.CacheProvider.partition-operation.thread-0] key = key14 [hz.CacheProvider.partition-operation.thread-0] key = key13 [hz.CacheProvider.partition-operation.thread-2] key = key17
で、ちゃんと結果が呼び出し元で得られます。
Result = 420
分散実行できましたね!
com.hazelcast.cache.BackupAwareEntryProcessor
Hazelcastは、バックアップ用のNode向けに、EntryProcessorを拡張したBackupAwareEntryProcessorを提供する予定のようです。3.3.2の時点ではまだ存在していないみたいですが、3.4向けにはすでに存在しており、ドキュメントへの記載も行われています。
というか、Hazelcast自身が持っていたEntryProcessorも、そんな感じの構成でしたね。
Entry Processor
http://docs.hazelcast.org/docs/3.3/manual/html-single/hazelcast-documentation.html#entry-processor
GitHubの更新状態を見ていると、JCacheの部分は頻繁に更新されていますし、Cacheの設定等もできるようになるようなので、バージョンアップでさらに変わるでしょうね。
ちょこちょことチェックしていこうと思います。
最後に、今回作成したコードですが、こちらに置いています。
https://github.com/kazuhira-r/hazelcast-examples/tree/master/hazelcast-jcache-entryprocessor
ところで、EntryProcessorって事前に対象のキーを知らないといけないんですけど、こういうのってどうやって皆さん活用してるのかなぁ…。今回の例みたいに、最初にキーを集めてたら微妙なのでは。
Map/Reduceの方を使うのかな。
それとも、あらかじめ処理対象のキーが分かっている時に活用することが多いのかな。
追記)
今回も、書いたら速攻で開発者の方にコメントをいただきました。
新しいドキュメントのことと
@kazuhira_r here’s the new version of the @hazelcast JCache docs. We now also have BackupAwareEntryProcessor (URL
2014-11-02 19:15:19 via TweetDeck to @kazuhira_r
BackupAwareEntryProcessorについて
@kazuhira_r @hazelcast it offers the possibility to apply delta updates on backup nodes instead of copying the whole object over :)
2014-11-02 19:15:55 via TweetDeck to @noctarius2k
あと、自分がこうやって日本語でHazelcastのエントリを書いていることを、歓迎してくれているようです。
@kazuhira_r As always, your welcome :) We really appreciates your effort on writing Japanese tutorials on @hazelcast! :)
2014-11-02 19:22:58 via TweetDeck to @kazuhira_r
これは、嬉しいですねー。