先ほど、JCacheのEntryProcessorを使って、以下のエントリを書きました。
JCache(Hazelcast)のEntryProcessorで分散処理
http://d.hatena.ne.jp/Kazuhira/20141102/1414921828
が、適用したサンプルが悪いのですが、ちょっと微妙です。いわゆる集計処理になっていますけど、キーの集合を事前に集めていますし、集計処理が完全に浮いてましたし…。
Map<String, EntryProcessorResult<Integer>> results = cache.invokeAll(keys, (entry, arguments) -> { System.out.printf("[%s] key = %s%n", Thread.currentThread().getName(), entry.getKey()); if (entry.exists()) { return entry.getValue() * 2; } else { return 0; } }); int result = results.entrySet() .stream() .mapToInt(e -> e.getValue().get()) .sum();
せっかくなので、ここはHazelcast 3.3から追加されたAggregatorsで書き直してみましょう。
New Features
http://docs.hazelcast.org/docs/3.3/manual/html-single/hazelcast-documentation.html#new-features
Aggregators
http://docs.hazelcast.org/docs/3.3/manual/html-single/hazelcast-documentation.html#aggregators
Aggregatorsは、HazelcastのMap Reduce Framework上に構築されているものになります。
簡単なサンプルは、Hazelcastのドキュメントにあるのでそちらを…。というか、また今度詳しく見るとします。
依存関係の定義
とりあえず、Hazelcast本体。
<dependency> <groupId>com.hazelcast</groupId> <artifactId>hazelcast</artifactId> <version>3.3.2</version> </dependency>
JCacheは使いません。
今回は、テストコードで表現しようと思うので、JUnitとAssertJを追加。
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>1.7.0</version> <scope>test</scope> </dependency>
クラスタ構成でテストする
今回はHazelcastのAPIを直接使用するので、テストコードでクラスタが組めます。
なので、簡単のために下記のようなインターフェースを用意しました。
src/test/java/org/littlewings/hazelcast/aggregation/HazelcastTestSupport.java
package org.littlewings.hazelcast.aggregation; import java.util.List; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import com.hazelcast.config.Config; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; public interface HazelcastTestSupport { default <R> R withHazelcast(int numInstances, Function<HazelcastInstance, R> fun) { Config config = new Config(); List<HazelcastInstance> hazelcastInstances = IntStream .rangeClosed(1, numInstances) .mapToObj(i -> Hazelcast.newHazelcastInstance(config)) .collect(Collectors.toList()); try { return hazelcastInstances .stream() .findFirst() .map(hazelcast -> fun.apply(hazelcast)) .orElse(null); } finally { hazelcastInstances .stream() .forEach(h -> h.getLifecycleService().shutdown()); Hazelcast.shutdownAll(); } } }
これで、任意の数のNodeを起動して、その中でテストを実行できます。
ぶっちゃけ、インターフェースのデフォルトメソッドが使いたかっただけとも…。あと、Functionを試してみるのにも。ScalaやClojureで、このパターンいっぱい書いたなぁ…。
で、このインターフェースを実装したテストコードを書きます。
src/test/java/org/littlewings/hazelcast/aggregation/SimpleAggregationTest.java
package org.littlewings.hazelcast.aggregation; import static org.assertj.core.api.Assertions.*; import java.io.Serializable; import java.util.Map; import java.util.stream.IntStream; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.IMap; import com.hazelcast.mapreduce.aggregation.Aggregations; import com.hazelcast.mapreduce.aggregation.Supplier; import org.junit.Test; public class SimpleAggregationTest implements HazelcastTestSupport { // ここにテストコードを書く! }
まずは、単純に集計。
@Test public void testSimpleAggregation() { int result = withHazelcast(3, hazelcast -> { IMap<String, Integer> map = hazelcast.getMap("default"); IntStream .rangeClosed(1, 20) .forEach(i -> map.put("key" + i, i)); return map.aggregate(Supplier.all(), Aggregations.integerSum()); }); assertThat(result) .isEqualTo(210); }
Supplierで処理対象の決定や、データ抽出方法を決定するのに使用します。Aggregationsは、ここでは直接表れていませんが、Map Reduce FrameworkのMapper、ReducerFactory、CombinerFactory、Collatorを提供するインターフェース、Aggregatorのよく使う実装を提供するクラスになります。
ここでは、全エントリの値を対象に、Sumしましょうと。
これ、テストを実行するとNodeが3つ起動しますよ。
Members [3] { Member [192.168.129.129]:5701 Member [192.168.129.129]:5702 Member [192.168.129.129]:5703 this }
Node数は3つにしたので。
int result = withHazelcast(3, hazelcast -> {
続いて、JCacheの例の時と同じく、値を2倍して集計してみます。
今回は、Supplierを実装します。
public static class DoublingSupplier extends Supplier<String, Integer, Integer> implements Serializable { @Override public Integer apply(Map.Entry<String, Integer> entry) { return entry.getValue() * 2; } }
ここで、値を2倍、と。
で、作成したSupplierを使ったテストコード。
@Test public void testDoublingAggregation() { int result = withHazelcast(3, hazelcast -> { IMap<String, Integer> map = hazelcast.getMap("default"); IntStream .rangeClosed(1, 20) .forEach(i -> map.put("key" + i, i)); return map.aggregate(new DoublingSupplier(), Aggregations.integerSum()); }); assertThat(result) .isEqualTo(420); }
こういうパターンなら、Aggregatorの方がスッキリですな。
今度、また詳しく見てみましょう…。
今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/hazelcast-examples/tree/master/hazelcast-simple-aggregation