CLOVER🍀

That was when it all began.

さっきのJCacheの例を、HazelcastのAggregatorsで書き直してみたよ

先ほど、JCacheのEntryProcessorを使って、以下のエントリを書きました。

JCache(Hazelcast)のEntryProcessorで分散処理
http://d.hatena.ne.jp/Kazuhira/20141102/1414921828

が、適用したサンプルが悪いのですが、ちょっと微妙です。いわゆる集計処理になっていますけど、キーの集合を事前に集めていますし、集計処理が完全に浮いてましたし…。

            Map<String, EntryProcessorResult<Integer>> results =
                cache.invokeAll(keys, (entry, arguments) -> {
                    System.out.printf("[%s] key = %s%n", Thread.currentThread().getName(), entry.getKey());

                    if (entry.exists()) {
                        return entry.getValue() * 2;
                    } else {
                        return 0;
                    }
                });

            int result = results.entrySet()
                .stream()
                .mapToInt(e -> e.getValue().get())
                .sum();

せっかくなので、ここはHazelcast 3.3から追加されたAggregatorsで書き直してみましょう。

New Features
http://docs.hazelcast.org/docs/3.3/manual/html-single/hazelcast-documentation.html#new-features

Aggregators
http://docs.hazelcast.org/docs/3.3/manual/html-single/hazelcast-documentation.html#aggregators

Aggregatorsは、HazelcastのMap Reduce Framework上に構築されているものになります。

簡単なサンプルは、Hazelcastのドキュメントにあるのでそちらを…。というか、また今度詳しく見るとします。

依存関係の定義

とりあえず、Hazelcast本体。

    <dependency>
      <groupId>com.hazelcast</groupId>
      <artifactId>hazelcast</artifactId>
      <version>3.3.2</version>
    </dependency>

JCacheは使いません。

今回は、テストコードで表現しようと思うので、JUnitとAssertJを追加。

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.assertj</groupId>
      <artifactId>assertj-core</artifactId>
      <version>1.7.0</version>
      <scope>test</scope>
    </dependency>

クラスタ構成でテストする

今回はHazelcastのAPIを直接使用するので、テストコードでクラスタが組めます。

なので、簡単のために下記のようなインターフェースを用意しました。
src/test/java/org/littlewings/hazelcast/aggregation/HazelcastTestSupport.java

package org.littlewings.hazelcast.aggregation;

import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;

public interface HazelcastTestSupport {
    default <R> R withHazelcast(int numInstances, Function<HazelcastInstance, R> fun) {
        Config config = new Config();

        List<HazelcastInstance> hazelcastInstances = IntStream
            .rangeClosed(1, numInstances)
            .mapToObj(i -> Hazelcast.newHazelcastInstance(config))
            .collect(Collectors.toList());

        try {
            return hazelcastInstances
                .stream()
                .findFirst()
                .map(hazelcast -> fun.apply(hazelcast))
                .orElse(null);
        } finally {
            hazelcastInstances
                .stream()
                .forEach(h -> h.getLifecycleService().shutdown());

            Hazelcast.shutdownAll();
        }
    }
}

これで、任意の数のNodeを起動して、その中でテストを実行できます。

ぶっちゃけ、インターフェースのデフォルトメソッドが使いたかっただけとも…。あと、Functionを試してみるのにも。ScalaClojureで、このパターンいっぱい書いたなぁ…。

で、このインターフェースを実装したテストコードを書きます。
src/test/java/org/littlewings/hazelcast/aggregation/SimpleAggregationTest.java

package org.littlewings.hazelcast.aggregation;

import static org.assertj.core.api.Assertions.*;

import java.io.Serializable;
import java.util.Map;
import java.util.stream.IntStream;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.mapreduce.aggregation.Aggregations;
import com.hazelcast.mapreduce.aggregation.Supplier;

import org.junit.Test;

public class SimpleAggregationTest implements HazelcastTestSupport {
    // ここにテストコードを書く!
}

まずは、単純に集計。

    @Test
    public void testSimpleAggregation() {
        int result = withHazelcast(3, hazelcast -> {
            IMap<String, Integer> map = hazelcast.getMap("default");

            IntStream
                .rangeClosed(1, 20)
                .forEach(i -> map.put("key" + i, i));

            return map.aggregate(Supplier.all(), Aggregations.integerSum());
        });

        assertThat(result)
            .isEqualTo(210);
    }

Supplierで処理対象の決定や、データ抽出方法を決定するのに使用します。Aggregationsは、ここでは直接表れていませんが、Map Reduce FrameworkのMapper、ReducerFactory、CombinerFactory、Collatorを提供するインターフェース、Aggregatorのよく使う実装を提供するクラスになります。

ここでは、全エントリの値を対象に、Sumしましょうと。

これ、テストを実行するとNodeが3つ起動しますよ。

Members [3] {
	Member [192.168.129.129]:5701
	Member [192.168.129.129]:5702
	Member [192.168.129.129]:5703 this
}

Node数は3つにしたので。

        int result = withHazelcast(3, hazelcast -> {

続いて、JCacheの例の時と同じく、値を2倍して集計してみます。

今回は、Supplierを実装します。

    public static class DoublingSupplier extends Supplier<String, Integer, Integer> implements Serializable {
        @Override
        public Integer apply(Map.Entry<String, Integer> entry) {
            return entry.getValue() * 2;
        }
    }

ここで、値を2倍、と。

で、作成したSupplierを使ったテストコード。

    @Test
    public void testDoublingAggregation() {
        int result = withHazelcast(3, hazelcast -> {
            IMap<String, Integer> map = hazelcast.getMap("default");

            IntStream
                .rangeClosed(1, 20)
                .forEach(i -> map.put("key" + i, i));

            return map.aggregate(new DoublingSupplier(), Aggregations.integerSum());
        });

        assertThat(result)
            .isEqualTo(420);
    }

こういうパターンなら、Aggregatorの方がスッキリですな。

今度、また詳しく見てみましょう…。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/hazelcast-examples/tree/master/hazelcast-simple-aggregation