CLOVER🍀

That was when it all began.

HazelcastのAggregatorsを試してみる

この前、JCacheで書いたサンプルコードをHazelcastのAggregatorsで書き直してみたのですが、せっかくなので1度ちゃんとAggregatorsを見ておこうかと思いまして。

Aggregators
http://docs.hazelcast.org/docs/3.3/manual/html-single/hazelcast-documentation.html#aggregators

AggregatorsはHazelcast 3.3からの新機能で、既存のMap Reduce Framework上に構築されたものになります。集計、最大・最小値を求める、平均値を求めるなどの用途で使われることを狙ったもののようです。

また、標準的な実装はcom.hazelcast.mapreduce.aggregation.Aggregationsクラスやcom.hazelcast.mapreduce.aggregation.Supplierクラスにて提供されているので、簡単なものならすぐに使うことができます。

基本的な考え方

HazelcastのMap Reduce Frameworkでは、MapやSet、Listなどの各種データ構造を起点としてMap Reduceを起動することができます。

これに対して、AggregatorsはIMap#aggregateまたはMultiMap#aggregateを起点に起動する操作になります。よって、Aggregatorsが使用できるのは、Distributed MapおよびMultiMapになります。

また、主な構成要素ですが、Aggregatorに値を提供するSupplier、条件を絞り込むPredicateまたはKeyPredicate、Map Reduce関連のクラスのファクトリとなるAggregation、そしてJobTrackerで構成されます。

これらの構成要素を使用して、MapperフェーズでSupplierおよびPredicateを使用して対象に対して絞り込みや演算を実行、その後CombinerおよびReducerで畳み込み、Collatorで最終結果を出すといった流れになります。このあたりは、Map Reduceそのままですね。よって、ジョブの定義をJobTrackerで行うことになります。

簡単に使うなら、ほぼ用意されたもので書くことができます。

まずは、使ってみましょう。

準備

まずはMaven依存関係。Hazelcast本体があれば、十分です。

    <dependency>
      <groupId>com.hazelcast</groupId>
      <artifactId>hazelcast</artifactId>
      <version>3.3.2</version>
    </dependency>

また、テスト用にJUnitとAssertJも使用します。

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.assertj</groupId>
      <artifactId>assertj-core</artifactId>
      <version>1.7.0</version>
      <scope>test</scope>
    </dependency>

テストコードの基底の用意

とりあえず、Hazelcastのテストを簡易に実行するために、以下のようなクラスタを簡単に構成するインターフェースを作成しました。各種テストコードは、このインターフェースを実装する想定です。
src/test/java/org/littlewings/hazelcast/aggregators/HazelcastTestSupport.java

package org.littlewings.hazelcast.aggregators;

import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import com.hazelcast.config.ClasspathXmlConfig;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;

interface HazelcastTestSupport {
    default <R> R withHazelcast(int numInstances, String xmlConfigFileName, Function<HazelcastInstance, R> fun) {
        return withHazelcast(numInstances, new ClasspathXmlConfig(xmlConfigFileName), fun);
    }

    default <R> R withHazelcast(int numInstances, Function<HazelcastInstance, R> fun) {
        return withHazelcast(numInstances, (Config) null, fun);
    }

    default <R> R withHazelcast(int numInstances, Config config, Function<HazelcastInstance, R> fun) {
        Config c;
        if (config == null) {
            c = new Config();
        } else {
            c = config;
        }

        List<HazelcastInstance> hazelcastInstances =
            IntStream
                .rangeClosed(1, numInstances)
                .mapToObj(i -> Hazelcast.newHazelcastInstance(c))
                .collect(Collectors.toList());

        try {
            return fun.apply(hazelcastInstances.get(0));
        } finally {
            hazelcastInstances
                .stream()
                .forEach(h -> h.getLifecycleService().shutdown());

            Hazelcast.shutdownAll();
        }
    }
}

いくつかオーバーロードされたものがありますが、以下を使用するとクラスパスから設定ファイルを読み込みます。

    default <R> R withHazelcast(int numInstances, String xmlConfigFileName, Function<HazelcastInstance, R> fun) {
        return withHazelcast(numInstances, new ClasspathXmlConfig(xmlConfigFileName), fun);
    }

標準的なオペレーションを試してみる

それでは、以下の2つのクラスを使って、標準的に提供されているオペレーションを試してみましょう。

  • com.hazelcast.mapreduce.aggregation.Supplier
  • com.hazelcast.mapreduce.aggregation.Aggregations

ひとまず、テストクラスの雛形を用意。
src/test/java/org/littlewings/hazelcast/aggregators/HazelcastSimpleAggregatorTest.java

package org.littlewings.hazelcast.aggregators;

import static org.assertj.core.api.Assertions.*;

import java.util.stream.IntStream;

import com.hazelcast.core.IMap;
import com.hazelcast.mapreduce.aggregation.Aggregations;
import com.hazelcast.mapreduce.aggregation.Supplier;

import org.junit.Test;

public class HazelcastSimpleAggregatorTest implements HazelcastTestSupport {
    // ここに、テストを書く!
}

簡単のために、Map<String, Integer>を集計するテストを行うとします。

単純な例。

    @Test
    public void testSimpleAggregator() {
        int result = withHazelcast(1, hazelcast -> {
            IMap<String, Integer> map = hazelcast.getMap("default");

            IntStream.rangeClosed(1, 20).forEach(i -> map.put("key" + i, i));

            return map.aggregate(Supplier.all(), Aggregations.integerSum());
        });

        assertThat(result)
            .isEqualTo(210);
    }

Distributed Mapに対して、key1,key2,...,key20に対して、1,2,...,20と値を登録していきます。

            IntStream.rangeClosed(1, 20).forEach(i -> map.put("key" + i, i));

これに対して、SupplierとAggregationsに登録されているメソッドを使い、IMap#aggregateを呼び出します。

            return map.aggregate(Supplier.all(), Aggregations.integerSum());

これだけで、DistirbutedMapに登録されている「値」の部分の和が計算できています。

        assertThat(result)
            .isEqualTo(210);

Supplierは抽象クラスなのですが、ファクトリメソッドも提供しており、全件を対象とするSupplierを提供するメソッド、Predicate、KeyPredicateを使用して絞り込みも行うSupplierを提供するメソッドを持っています。今回はSupplier#allを使用したので、全件が対象です。

対するAggregationsは、よく使用されそうなオペレーションがまとめられたクラスになります。Integer、Long、Double、BigInteger、BigDecimal、Comparableに対する和、最大・最小値、平均値、カウント、Distinctなどができるようです。

今回は、Integerの和を求めることにしています。

その他の例。

値を2倍して、和を求めます。

    @Test
    public void testSimpleDoublingAggregator() {
        int result = withHazelcast(2, hazelcast -> {
            IMap<String, Integer> map = hazelcast.getMap("default");

            IntStream.rangeClosed(1, 20).forEach(i -> map.put("key" + i, i));

            return map.aggregate(Supplier.all(value -> value * 2),
                                 Aggregations.integerSum());
        });

        assertThat(result)
            .isEqualTo(420);
    }

Supplier#allにラムダ式を与えることで、値を2倍して処理を行うSupplierを得ています。

偶数のみを対象にして、和を求める例。

    @Test
    public void testEvenAggregator() {
        int result = withHazelcast(1, hazelcast -> {
            IMap<String, Integer> map = hazelcast.getMap("default");

            IntStream.rangeClosed(1, 20).forEach(i -> map.put("key" + i, i));

            return map.aggregate(Supplier.fromPredicate(entry -> entry.getValue()  % 2 == 0),
                                 Aggregations.integerSum());
        });

        assertThat(result)
            .isEqualTo(110);
    }

Supplier#fromPredicateにラムダ式を与えることで、偶数の値を対象とするSupplierを作成します。なお、ラムダ式で実装するPredicate#applyメソッドの引数は、Map.Entryです。

偶数を対象にしつつ、かつ値を2倍して和を求める例。

    @Test
    public void testEvenDoublingAggregator() {
        int result = withHazelcast(1, hazelcast -> {
            IMap<String, Integer> map = hazelcast.getMap("default");

            IntStream.rangeClosed(1, 20).forEach(i -> map.put("key" + i, i));

            return map.aggregate(Supplier.fromPredicate(entry -> entry.getValue()  % 2 == 0,
                                                        Supplier.all(value -> value * 2)),
                                 Aggregations.integerSum());
        });

        assertThat(result)
            .isEqualTo(220);
    }

Supplier#fromPredicateにラムダ式で絞り込みの条件を与え、第2引数でSupplierを与えることで、絞り込みを行いつつ値を2倍するSupplierを得ています。

Supplier#fromKeyPredicateを使用する例は今回は使いませんでしたが、Map.Entryを対象とするPredicateに対して、絞り込みをキーのみで判断するのがKeyPredicateになります。

基本的には、こんな感じの使い方をするようです。

自分でSupplier、Predicate、Aggregationを実装する

それでは、もうちょっとAggregatorsを理解するために、これらの要素を自分で実装してみましょう。

目標は、以下のケース(偶数で絞り込み、値を2倍する)と同等のSupplier/Aggregationを実装するものとします。

    @Test
    public void testEvenDoublingAggregator() {
        int result = withHazelcast(1, hazelcast -> {
            IMap<String, Integer> map = hazelcast.getMap("default");

            IntStream.rangeClosed(1, 20).forEach(i -> map.put("key" + i, i));

            return map.aggregate(Supplier.fromPredicate(entry -> entry.getValue()  % 2 == 0,
                                                        Supplier.all(value -> value * 2)),
                                 Aggregations.integerSum());
        });

        assertThat(result)
            .isEqualTo(220);
    }

また、簡単のためキーや値の型はStringとIntegerで固定することにします。

まずはSupplier。これは簡単で、先ほどのラムダ式の部分をクラスに切り出しただけですね。
src/main/java/org/littlewings/hazelcast/aggregators/DoublingSupplier.java

package org.littlewings.hazelcast.aggregators;

import java.util.Map;

import com.hazelcast.mapreduce.aggregation.Supplier;

public class DoublingSupplier extends Supplier<String, Integer, Integer> {
    @Override
    public Integer apply(Map.Entry<String, Integer> entry) {
        return entry.getValue() * 2;
    }
}

続いて、Predicate。こちらも同様。
src/main/java/org/littlewings/hazelcast/aggregators/EvenPredicate.java

package org.littlewings.hazelcast.aggregators;

import java.util.Map;

import com.hazelcast.query.Predicate;

public class EvenPredicate implements Predicate<String, Integer> {
    @Override
    public boolean apply(Map.Entry<String, Integer> entry) {
        return entry.getValue() % 2 == 0;
    }
}

そして、Aggregationを実装します。実装する前に、まずはAggregationインターフェースで実装すべきメソッドを確認しましょう。

public interface Aggregation<Key,Supplied,Result> {
    Collator<Map.Entry,Result> getCollator();

    Mapper getMapper(Supplier<Key,?,Supplied> supplier);

    CombinerFactory getCombinerFactory();

    ReducerFactory getReducerFactory();
}

見ての通り、Map Reduceに関連するクラスを返す、ファクトリとしての役割のようです。Mapperを作成するところのみ、Supplierが引数になるようになっていますね。

ここで、Distributed Mapでaggregateメソッドが実装されている、以下のコードを見ています。
https://github.com/hazelcast/hazelcast/blob/v3.3.2/hazelcast/src/main/java/com/hazelcast/map/proxy/MapProxyImpl.java#L660

見ての通り、Map ReduceのJobを構成して実行しているだけになります。

            KeyValueSource<K, V> keyValueSource = KeyValueSource.fromMap(this);
            Job<K, V> job = jobTracker.newJob(keyValueSource);
            Mapper mapper = aggregation.getMapper(supplier);
            CombinerFactory combinerFactory = aggregation.getCombinerFactory();
            ReducerFactory reducerFactory = aggregation.getReducerFactory();
            Collator collator = aggregation.getCollator();

            MappingJob mappingJob = job.mapper(mapper);
            ReducingSubmittableJob reducingJob;
            if (combinerFactory == null) {
                reducingJob = mappingJob.reducer(reducerFactory);
            } else {
                reducingJob = mappingJob.combiner(combinerFactory).reducer(reducerFactory);
            }

            ICompletableFuture<Result> future = reducingJob.submit(collator);
            return future.get();

1度中を見ると、用意するものがわかりやすいですね。また、Combinerを実装することが必須でないことも分かります。

上記はJobTrackerを引数に取る定義なのですが、最初にテストコードをして使った方では、JobTrackerは引数に取りませんでした。この場合、どうなっているかというと

        HazelcastInstance hazelcastInstance = getNodeEngine().getHazelcastInstance();
        JobTracker jobTracker = hazelcastInstance.getJobTracker("hz::aggregation-map-" + getName());
        return aggregate(supplier, aggregation, jobTracker);

のように、Distributed Mapの名前からPrefixが固定のJobTrackerが取得されることになります。

このあたりは、ドキュメントの「Aggregation Configuration」に記載があります。設定したい場合は、Prefixに従った名前のJobTrackerをあらかじめ定義しておくか、自分でJobTrackerをaggregateの引数に与えることになるでしょう。

で、ここまでを踏まえてMapperを作成。
src/main/java/org/littlewings/hazelcast/aggregators/SupplierDelegateMapper.java

package org.littlewings.hazelcast.aggregators;

import java.util.Map;

import com.hazelcast.mapreduce.Context;
import com.hazelcast.mapreduce.Mapper;
import com.hazelcast.mapreduce.aggregation.Supplier;

public class SupplierDelegateMapper implements Mapper<String, Integer, String, Integer> {
    private Supplier<String, ?, Integer> supplier;

    public SupplierDelegateMapper(Supplier<String, ?, Integer> supplier) {
        this.supplier = supplier;
    }

    @Override
    public void map(String key, Integer value, Context<String, Integer> context) {
        AbstractMap.SimpleEntry entry = new AbstractMap.SimpleEntry(key, value);
        Integer valueOut = (Integer) supplier.apply((Map.Entry) entry);

        if (valueOut != null) {
            context.emit(key, valueOut);
        }
    }
}

Mapperの実装自体は非常に単純で、Context#emitする時にSupplier#applyで値を決定するところがポイントになるくらいです。ただ、Supplier#applyの引数はMap.Entryのため、何らかのMap.Entryの実装が必要ですけれど…。

今回は簡単のため、java.util.AbstractMap.SimpleEntryを使用しました。Hazelcastの場合は、自分自身で簡単なMap.Entryの実装を持っているようです。

ReducerおよびCombinerは単純なので、さらっと。
src/main/java/org/littlewings/hazelcast/aggregators/IntegerSumReducerFactory.java

package org.littlewings.hazelcast.aggregators;

import com.hazelcast.mapreduce.Reducer;
import com.hazelcast.mapreduce.ReducerFactory;

public class IntegerSumReducerFactory implements ReducerFactory<String, Integer, Integer> {
    @Override
    public Reducer<Integer, Integer> newReducer(String key) {
        return new IntegerSumReducer();
    }

    private static class IntegerSumReducer extends Reducer<Integer, Integer> {
        private int sum;

        @Override
        public void reduce(Integer value) {
            sum += value;
        }

        @Override
        public Integer finalizeReduce() {
            return sum;
        }
    }
}

Combiner側。
src/main/java/org/littlewings/hazelcast/aggregators/IntegerSumCombinerFactory.java

package org.littlewings.hazelcast.aggregators;

import com.hazelcast.mapreduce.Combiner;
import com.hazelcast.mapreduce.CombinerFactory;

public class IntegerSumCombinerFactory implements CombinerFactory<String, Integer, Integer> {
    @Override
    public Combiner<Integer, Integer> newCombiner(String key) {
        return new IntegerSumCombiner();
    }

    private static class IntegerSumCombiner extends Combiner<Integer, Integer> {
        private int sum;

        @Override
        public void combine(Integer value) {
            sum += value;
        }

        @Override
        public Integer finalizeChunk() {
            return sum;
        }

        @Override
        public void reset() {
            sum = 0;
        }
    }
}

3.2の頃と比べて、Combinerにresetというメソッドが増えている気がします…。

それからCollator。
src/main/java/org/littlewings/hazelcast/aggregators/IntegerSumCollator.java

package org.littlewings.hazelcast.aggregators;

import java.util.Map;
import java.util.stream.StreamSupport;

import com.hazelcast.mapreduce.Collator;

public class IntegerSumCollator implements Collator<Map.Entry, Integer> {
    public Integer collate(Iterable<Map.Entry> values) {
        return StreamSupport.stream(values.spliterator(), false)
            .mapToInt(entry -> ((Map.Entry<String, Integer>) entry).getValue())
            .sum();
    }
}

こちらも単純なのですが、ムダにStreamSupportとか使ってみました。

そして、これらを利用するAggregation。
src/main/java/org/littlewings/hazelcast/aggregators/IntegerSumAggregation.java

package org.littlewings.hazelcast.aggregators;

import java.util.Map;

import com.hazelcast.mapreduce.Collator;
import com.hazelcast.mapreduce.CombinerFactory;
import com.hazelcast.mapreduce.Mapper;
import com.hazelcast.mapreduce.ReducerFactory;
import com.hazelcast.mapreduce.aggregation.Aggregation;
import com.hazelcast.mapreduce.aggregation.Supplier;

public class IntegerSumAggregation implements Aggregation<String, Integer, Integer> {
    @Override
    public Collator<Map.Entry, Integer> getCollator() {
        return new IntegerSumCollator();
    }

    @Override
    public CombinerFactory<String, Integer, Integer> getCombinerFactory() {
        return new IntegerSumCombinerFactory();
    }

    @Override
    public Mapper<String, Integer, String, Integer> getMapper(Supplier<String, ?, Integer> supplier) {
        return new SupplierDelegateMapper(supplier);
    }

    public ReducerFactory<String, Integer, Integer> getReducerFactory() {
        return new IntegerSumReducerFactory();
    }
}

なんとなく、Combinerを使用しないバージョンも用意してみました。

public class IntegerSumAggregationNoCombiner implements Aggregation<String, Integer, Integer> {
    @Override
    public Collator<Map.Entry, Integer> getCollator() {
        return new IntegerSumCollator();
    }

    @Override
    public CombinerFactory<String, Integer, Integer> getCombinerFactory() {
        return null;
    }

    @Override
    public Mapper<String, Integer, String, Integer> getMapper(Supplier<String, ?, Integer> supplier) {
        return new SupplierDelegateMapper(supplier);
    }

    public ReducerFactory<String, Integer, Integer> getReducerFactory() {
        return new IntegerSumReducerFactory();
    }
}

そして、これらを使ったテストコード。
src/test/java/org/littlewings/hazelcast/aggregators/MyAggregationTest.java

package org.littlewings.hazelcast.aggregators;

import static org.assertj.core.api.Assertions.*;

import java.util.stream.IntStream;

import com.hazelcast.core.IMap;
import com.hazelcast.mapreduce.JobTracker;
import com.hazelcast.mapreduce.aggregation.Supplier;

import org.junit.Test;

public class MyAggregationTest implements HazelcastTestSupport {
    // ここに、テストコードを書く!
}

Combinerを使用する版。

    @Test
    public void testMyAggregation() {
        int result = withHazelcast(2, "hazelcast-aggregations.xml", hazelcast -> {
            assertThat(hazelcast.getConfig().getMapConfig("aggregateMap").getBackupCount())
                .isEqualTo(1);
            assertThat(hazelcast.getConfig().getJobTrackerConfig("aggregateJobTracker").getChunkSize())
                .isEqualTo(500);

            IMap<String, Integer> map = hazelcast.getMap("aggregateMap");
            IntStream.rangeClosed(1, 20).forEach(i -> map.put("key" + i, i));

            JobTracker jobTracker = hazelcast.getJobTracker("aggregateJobTracker");

            return map.aggregate(Supplier.fromPredicate(new EvenPredicate(), new DoublingSupplier()),
                                 new IntegerSumAggregation(),
                                 jobTracker);
        });

        assertThat(result)
            .isEqualTo(220);

Combinerを使用しない版。

    @Test
    public void testMyAggregationNoCombiner() {
        int result = withHazelcast(2, "hazelcast-aggregations.xml", hazelcast -> {
            assertThat(hazelcast.getConfig().getMapConfig("aggregateMap").getBackupCount())
                .isEqualTo(1);
            assertThat(hazelcast.getConfig().getJobTrackerConfig("aggregateJobTracker").getChunkSize())
                .isEqualTo(500);

            IMap<String, Integer> map = hazelcast.getMap("aggregateMap");
            IntStream.rangeClosed(1, 20).forEach(i -> map.put("key" + i, i));

            JobTracker jobTracker = hazelcast.getJobTracker("aggregateJobTracker");

            return map.aggregate(Supplier.fromPredicate(new EvenPredicate(), new DoublingSupplier()),
                                 new IntegerSumAggregationNoCombiner(),
                                 jobTracker);
        });

        assertThat(result)
            .isEqualTo(220);
    }

設定ファイルは、こんな感じで仕込みました。
src/test/resources/hazelcast-aggregations.xml

<?xml version="1.0" encoding="UTF-8"?>
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.3.xsd"
           xmlns="http://www.hazelcast.com/schema/config"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <map name="aggregateMap">
    <backup-count>1</backup-count>
  </map>

  <jobtracker name="aggregateJobTracker">
    <max-thread-size>0</max-thread-size>
    <queue-size>0</queue-size>
    <retry-count>0</retry-count>
    <chunk-size>500</chunk-size>
    <communicate-stats>true</communicate-stats>
    <topology-changed-strategy>CANCEL_RUNNING_OPERATION</topology-changed-strategy>
  </jobtracker>
</hazelcast>

結局、Supplier#fromPredicate使ってるじゃん…という点はありますが。これも使わないようにした場合は、内部で使用されているcom.hazelcast.mapreduce.aggregation.impl.PredicateSupplierクラスのインスタンスを直接生成する形になるでしょう。

その他、作成したMapperがちょっといびつだったりといろいろありますが、けっこう良い勉強になりました。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/hazelcast-examples/tree/master/hazelcast-aggregators