CLOVER🍀

That was when it all began.

Hazelcast Jetで、DAGを使ってジョブを書いてみる

Hazelcast JetのGetting Startedに載っているのは、Hazelcast Jet上でStream APIを分散実行してWordCountするものでした。

Get Started - Hazelcast - High-Performance Stream Processing

マニュアルの方では、もう一歩進んでDAGベースのジョブ定義がしてあるので、こちらを使ってコードを書いてみましょう。

Hazelcast Jet 101 - Word Counting Batch Job - Hazelcast Jet Reference Manual

この時、単純にコードを書き写すのもなんなので、前に書いたこちらのエントリのコードをベースにしつつ
はじめてのHazelcast Jet(Embedded Distributed Stream API) - CLOVER

もう少しアレンジして書いてみたいと思います。

まあ、ネタは結局WordCountなのですが。

準備

Maven依存関係としては、hazelcast-jet-coreがあればOKです。

        <dependency>
            <groupId>com.hazelcast.jet</groupId>
            <artifactId>hazelcast-jet-core</artifactId>
            <version>0.4</version>
        </dependency>

あとは、単に浮いていてもらうHazelcast Jetインスタンス、ジョブを開始するインスタンスをそれぞれ用意しようと
思うので、まずは単に浮いていてもらう方を用意しておきます。
src/main/java/org/littlewings/hazelcast/jet/EmbeddedJetServer.java

package org.littlewings.hazelcast.jet;

import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;

public class EmbeddedJetServer {
    public static void main(String... args) {
        JetInstance jet = Jet.newJetInstance();

        System.console().readLine("> stop enter.");

        jet.shutdown();
        Jet.shutdownAll();
    }
}

あと、WordCountする対象のデータは、前回同様こちらのブログエントリとします。
Jet 0.4 is Released | Hazelcast Blog

こんな感じ。
※後半は省略

$ head -n 20 src/main/resources/jet-0_4-release.txt 
We are happy to announce the release of Hazelcast Jet 0.4. This is the first major release since our inital version and has many improvements bringing us closer to our final vision for Jet:

Improved streaming support including windowing support with event-time semantics.
Out of the box support for tumbling, sliding and session window aggregations.
New AggregateOperation abstraction with several built-in ones including count, average, sum, min, max and linear regression.
Hazelcast version updated to 3.8.2 and Hazelcast IMDG is now shaded inside hazelcast-jet.jar.
Several built-in diagnostic processors and unit test support for writing custom processors.
Many new code samples including several streaming examples and enrichment and co-group for batch operations.
New sources and sinks including ICache, socket and file.
Windowing Support with Event-Time semantics

The biggest improvement is the addition of tools for using Jet on infinite streams of data. Dealing with streaming data is fundamentally different than batch or micro-batch processing as both input and output is continuous. Most streaming computations also deal with some notion of time where you are interested in how a value changes over time. The typical way to deal with streaming data is to look at it in terms of “windows”, where a window represents a slice of the data stream, usually constrained for a period of time.

Jet 0.4 adds several processors which deal specifically with aggregation of streaming data into windows. Types of windowing supported can be summarised as follows:

Tumbling Windows: Fixed-size, non-overlapping and contiguous intervals. For example a tumbling window of 1 minute would mean that each window is of 1 minute length, are not overlapping, and contiguous to each other.
Sliding Windows: A variation on tumbling windows, sliding windows can have overlapping elements and slide in fixed increments as time advances.
Session Windows: Typically captures a burst of events which are separated by a period of inactivity. This can be used to window events by a user session, for example.
Jet also supports the notion of “event-time” where events can have their own timestamp and can arrive out of order. This is achieved by inserting watermarks into the stream of events which drive the passage of time forward.

こちらを、Distributed Mapに放り込んで使います。

とりあえず、準備はここまで。

サンプルコード

最初に、できあがったコードから。
src/main/java/org/littlewings/hazelcast/jet/WordCountRunner.java

package org.littlewings.hazelcast.jet;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import com.hazelcast.jet.AggregateOperations;
import com.hazelcast.jet.DAG;
import com.hazelcast.jet.Edge;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Partitioner;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Vertex;
import com.hazelcast.jet.function.DistributedFunctions;
import com.hazelcast.jet.processor.Processors;
import com.hazelcast.jet.processor.Sinks;
import com.hazelcast.jet.processor.Sources;
import com.hazelcast.jet.stream.DistributedCollectors;
import com.hazelcast.jet.stream.IStreamMap;

public class WordCountRunner {
    public static void main(String... args) throws ExecutionException, InterruptedException {
        JetInstance jet = Jet.newJetInstance();

        IStreamMap<Integer, String> map = jet.getMap("source");

        load(map, "jet-0_4-release.txt");

        DAG dag = new DAG();

        Vertex source = dag.newVertex("source", Sources.readMap("source"));

        Vertex tokenize = dag.newVertex(
                "tokenize",
                Processors
                        .flatMap((Map.Entry<Integer, String> e) -> {
                            System.out.printf("[%s] tokenize: %s%n", Thread.currentThread().getName(), e);
                            return Traversers.traverseArray(
                                    e.getValue().replaceAll("[=()\"',.:;”-]", " ").trim().split(" +"));
                        }));
        Vertex filter = dag.newVertex("filter", Processors.filter((String s) -> {
            System.out.printf("[%s] filter: %s%n", Thread.currentThread().getName(), s);
            return !s.isEmpty();
        }));
        Vertex lower = dag.newVertex("lower", Processors.map((String s) -> {
            System.out.printf("[%s] lower: %s%n", Thread.currentThread().getName(), s);
            return s.toLowerCase();
        }));
        Vertex accumulate = dag.newVertex(
                "accumulate",
                Processors.accumulateByKey(DistributedFunctions.wholeItem(), AggregateOperations.counting())
        );
        Vertex combine = dag.newVertex("combine",
                Processors.combineByKey(AggregateOperations.counting())
        );

        Vertex sink = dag.newVertex("sink", Sinks.writeMap("word-count-map"));

        dag.edge(Edge.between(source, tokenize))
                .edge(Edge.between(tokenize, filter))
                .edge(Edge.between(filter, lower))
                .edge(Edge.between(lower, accumulate)
                        .partitioned(DistributedFunctions.wholeItem(), Partitioner.HASH_CODE))
                .edge(Edge.between(accumulate, combine)
                        .distributed()
                        .partitioned(DistributedFunctions.entryKey()))
                .edge(Edge.between(combine, sink));

        jet.newJob(dag).execute().get();

        IStreamMap<String, Long> wordCount = jet.getMap("word-count-map");

        List<Map.Entry<String, Long>> top20 =
                wordCount
                        .stream()
                        .sorted(Map.Entry.<String, Long>comparingByValue().reversed())
                        .limit(20)
                        .collect(DistributedCollectors.toList());


        System.out.println("result:");
        top20.forEach(e -> System.out.println("  " + e));

        System.out.println();
        System.out.println("source map size = " + jet.getMap("source").size());
        System.out.println("word-count-map size = " + jet.getMap("word-count-map").size());

        jet.shutdown();
        Jet.shutdownAll();
    }

    static void load(IStreamMap<Integer, String> map, String fileName) {
        try (InputStream is = WordCountRunner.class.getClassLoader().getResourceAsStream(fileName);
             InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8);
             BufferedReader reader = new BufferedReader(isr);
             Stream<String> lines = reader.lines()) {
            AtomicInteger count = new AtomicInteger();
            lines.forEach(line -> map.put(count.incrementAndGet(), line));
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
}

もともと、「WordCountして上位20件を抽出する」という処理をStream APIを使って書いていたのですが、
はじめてのHazelcast Jet(Embedded Distributed Stream API) - CLOVER

今回はDAGを使ってやるので全部つなげて書こうかと思いきや…SortとLimitがDAGを使って書けないことがわかり(APIが公開されていない)、
結局SortとLimitをStream APIでやることに…。

DAGを使ってジョブを組み上げているところはこちら、

        DAG dag = new DAG();

        Vertex source = dag.newVertex("source", Sources.readMap("source"));

        Vertex tokenize = dag.newVertex(
                "tokenize",
                Processors
                        .flatMap((Map.Entry<Integer, String> e) -> {
                            System.out.printf("[%s] tokenize: %s%n", Thread.currentThread().getName(), e);
                            return Traversers.traverseArray(
                                    e.getValue().replaceAll("[=()\"',.:;”-]", " ").trim().split(" +"));
                        }));
        Vertex filter = dag.newVertex("filter", Processors.filter((String s) -> {
            System.out.printf("[%s] filter: %s%n", Thread.currentThread().getName(), s);
            return !s.isEmpty();
        }));
        Vertex lower = dag.newVertex("lower", Processors.map((String s) -> {
            System.out.printf("[%s] lower: %s%n", Thread.currentThread().getName(), s);
            return s.toLowerCase();
        }));
        Vertex accumulate = dag.newVertex(
                "accumulate",
                Processors.accumulateByKey(DistributedFunctions.wholeItem(), AggregateOperations.counting())
        );
        Vertex combine = dag.newVertex("combine",
                Processors.combineByKey(AggregateOperations.counting())
        );

        Vertex sink = dag.newVertex("sink", Sinks.writeMap("word-count-map"));

        dag.edge(Edge.between(source, tokenize))
                .edge(Edge.between(tokenize, filter))
                .edge(Edge.between(filter, lower))
                .edge(Edge.between(lower, accumulate)
                        .partitioned(DistributedFunctions.wholeItem(), Partitioner.HASH_CODE))
                .edge(Edge.between(accumulate, combine)
                        .distributed()
                        .partitioned(DistributedFunctions.entryKey()))
                .edge(Edge.between(combine, sink));

実行。

        jet.newJob(dag).execute().get();

データのロードとSort/Limitはそう面白いところがないのではしょります。

実行結果

では、できあがったジョブを動かしてみます。

## Node 1
$ mvn exec:java -Dexec.mainClass=org.littlewings.hazelcast.jet.EmbeddedJetServer


## Node 2
$ mvn exec:java -Dexec.mainClass=org.littlewings.hazelcast.jet.EmbeddedJetServer


## WordCountRunner
$ mvn exec:java -Dexec.mainClass=org.littlewings.hazelcast.jet.WordCountRunner

結果。

まずは、ジョブをキックしたNodeで、実行内容が表示されます。

情報: [172.19.0.1]:5703 [jet] [0.4] [3.8.2] Start executing job 0: dag
    .vertex("source")
    .vertex("tokenize")
    .vertex("filter")
    .vertex("lower")
    .vertex("accumulate")
    .vertex("combine")
    .vertex("sink")
    .edge(between("source", "tokenize"))
    .edge(between("tokenize", "filter"))
    .edge(between("filter", "lower"))
    .edge(between("lower", "accumulate").partitioned(?))
    .edge(between("accumulate", "combine").partitioned(?).distributed())
    .edge(between("combine", "sink"))

各Nodeで、各フェーズが実行されている様子が標準出力からなんとなくわかり

## WordCountRunner
[hz._hzInstance_1_jet.jet.cooperative.thread-2] tokenize: 81=More information is available in the reference manual.
[hz._hzInstance_1_jet.jet.cooperative.thread-4] tokenize: 28=
[hz._hzInstance_1_jet.jet.cooperative.thread-3] tokenize: 55=Batch and streaming file reader and writers can be used for either reading static files or watching a directory for changes:
[hz._hzInstance_1_jet.jet.cooperative.thread-1] tokenize: 68=More detailed information can be found in the reference manual.
[hz._hzInstance_1_jet.jet.cooperative.thread-0] tokenize: 20=
[hz._hzInstance_1_jet.jet.cooperative.thread-0] tokenize: 12=The biggest improvement is the addition of tools for using Jet on infinite streams of data. Dealing with streaming data is fundamentally different than batch or micro-batch processing as both input and output is continuous. Most streaming computations also deal with some notion of time where you are interested in how a value changes over time. The typical way to deal with streaming data is to look at it in terms of “windows”, where a window represents a slice of the data stream, usually constrained for a period of time.
[hz._hzInstance_1_jet.jet.cooperative.thread-7] filter: can
[hz._hzInstance_1_jet.jet.cooperative.thread-7] filter: or
[hz._hzInstance_1_jet.jet.cooperative.thread-0] tokenize: 49=jetInstance.getCache('cacheName').stream().map(..)...
[hz._hzInstance_1_jet.jet.cooperative.thread-0] tokenize: 64=Vertex combine = dag.newVertex("combine", peekInput(combineByKey(counting())));
[hz._hzInstance_1_jet.jet.cooperative.thread-3] filter: file
[hz._hzInstance_1_jet.jet.cooperative.thread-3] filter: either
[hz._hzInstance_1_jet.jet.cooperative.thread-3] filter: for
[hz._hzInstance_1_jet.jet.cooperative.thread-3] tokenize: 78=              asList("foo", "bar"),
[hz._hzInstance_1_jet.jet.cooperative.thread-2] tokenize: 35=linearTrend()
[hz._hzInstance_1_jet.jet.cooperative.thread-2] filter: is
[hz._hzInstance_1_jet.jet.cooperative.thread-2] filter: streaming
[hz._hzInstance_1_jet.jet.cooperative.thread-2] filter: for
[hz._hzInstance_1_jet.jet.cooperative.thread-2] filter: directory
[hz._hzInstance_1_jet.jet.cooperative.thread-2] lower: for
[hz._hzInstance_1_jet.jet.cooperative.thread-2] lower: for
[hz._hzInstance_1_jet.jet.cooperative.thread-3] tokenize: 21=More detail description for types of windows supported by Jet can be found in the Hazelcast Jet Reference Manual.
[hz._hzInstance_1_jet.jet.cooperative.thread-6] filter: writers

〜省略〜


## Node 1
情報: [172.19.0.1]:5701 [jet] [0.4] [3.8.2] Start execution of plan for job 0 from caller [172.19.0.1]:5703.
[hz._hzInstance_1_jet.jet.cooperative.thread-1] tokenize: 72=New classes have been added to make it easier to write unit tests for processors, and avoid writing boilerplate code.
[hz._hzInstance_1_jet.jet.cooperative.thread-0] tokenize: 37=maxBy()
[hz._hzInstance_1_jet.jet.cooperative.thread-3] tokenize: 74=For example, the following will test that the processor produces the expected output, when the expected input is supplied. For cooperative processors, it will also make sure that it respects cooperative emission of output.
[hz._hzInstance_1_jet.jet.cooperative.thread-4] tokenize: 87=java -jar hazelcast-jet-0.4.jar
[hz._hzInstance_1_jet.jet.cooperative.thread-2] tokenize: 39=toList()
[hz._hzInstance_1_jet.jet.cooperative.thread-6] tokenize: 1=We are happy to announce the release of Hazelcast Jet 0.4. This is the first major release since our inital version and has many improvements bringing us closer to our final vision for Jet:
[hz._hzInstance_1_jet.jet.cooperative.thread-5] tokenize: 80=      );
[hz._hzInstance_1_jet.jet.cooperative.thread-7] tokenize: 86=
[hz._hzInstance_1_jet.jet.cooperative.thread-4] filter: added
[hz._hzInstance_1_jet.jet.cooperative.thread-4] filter: tests
[hz._hzInstance_1_jet.jet.cooperative.thread-2] tokenize: 71=
[hz._hzInstance_1_jet.jet.cooperative.thread-3] filter: been
[hz._hzInstance_1_jet.jet.cooperative.thread-3] filter: unit
[hz._hzInstance_1_jet.jet.cooperative.thread-0] tokenize: 75=
[hz._hzInstance_1_jet.jet.cooperative.thread-1] filter: classes
[hz._hzInstance_1_jet.jet.cooperative.thread-1] filter: to
[hz._hzInstance_1_jet.jet.cooperative.thread-1] filter: writing
[hz._hzInstance_1_jet.jet.cooperative.thread-1] lower: to
[hz._hzInstance_1_jet.jet.cooperative.thread-1] tokenize: 65=
[hz._hzInstance_1_jet.jet.cooperative.thread-1] tokenize: 66=Vertex log = dag.newVertex("log", DiagnosticProcessors.writeLogger())
[hz._hzInstance_1_jet.jet.cooperative.thread-0] filter: maxBy
[hz._hzInstance_1_jet.jet.cooperative.thread-3] filter: code
[hz._hzInstance_1_jet.jet.cooperative.thread-3] filter: following

〜省略〜


## Node 2
情報: [172.19.0.1]:5702 [jet] [0.4] [3.8.2] Start execution of plan for job 0 from caller [172.19.0.1]:5703.
[hz._hzInstance_1_jet.jet.cooperative.thread-0] tokenize: 77=              Processors.map((String s) -> s.toUpperCase()),
[hz._hzInstance_1_jet.jet.cooperative.thread-2] tokenize: 4=Out of the box support for tumbling, sliding and session window aggregations.
[hz._hzInstance_1_jet.jet.cooperative.thread-2] filter: String
[hz._hzInstance_1_jet.jet.cooperative.thread-2] filter: the
[hz._hzInstance_1_jet.jet.cooperative.thread-2] filter: window
[hz._hzInstance_1_jet.jet.cooperative.thread-2] lower: window
[hz._hzInstance_1_jet.jet.cooperative.thread-1] tokenize: 85=Embedded hazelcast version is updated to 3.8.2 and Hazelcast is now shaded into Jet. As a result, hazelcast-jet.jar have no dependencies and starting a new Jet instance now is as simple as:
[hz._hzInstance_1_jet.jet.cooperative.thread-1] filter: map
[hz._hzInstance_1_jet.jet.cooperative.thread-1] filter: hazelcast
[hz._hzInstance_1_jet.jet.cooperative.thread-1] filter: and
[hz._hzInstance_1_jet.jet.cooperative.thread-1] filter: a
[hz._hzInstance_1_jet.jet.cooperative.thread-1] filter: and
[hz._hzInstance_1_jet.jet.cooperative.thread-1] filter: as
[hz._hzInstance_1_jet.jet.cooperative.thread-1] filter: of
[hz._hzInstance_1_jet.jet.cooperative.thread-1] filter: session
[hz._hzInstance_1_jet.jet.cooperative.thread-1] lower: hazelcast
[hz._hzInstance_1_jet.jet.cooperative.thread-1] lower: the
[hz._hzInstance_1_jet.jet.cooperative.thread-1] tokenize: 17=Sliding Windows: A variation on tumbling windows, sliding windows can have overlapping elements and slide in fixed increments as time advances.
[hz._hzInstance_1_jet.jet.cooperative.thread-1] tokenize: 51=Socket readers and writers can read and write to simple text based sockets. An example can be found inside the Hazelcast Jet Code Samples Repository
[hz._hzInstance_1_jet.jet.cooperative.thread-1] tokenize: 84=
[hz._hzInstance_1_jet.jet.cooperative.thread-1] tokenize: 47=Hazelcast ICache can be used as a source or sink and it can also be used as a source for distributed java.util.stream computations:
[hz._hzInstance_1_jet.jet.cooperative.thread-1] tokenize: 32=summingLong()
[hz._hzInstance_1_jet.jet.cooperative.thread-1] tokenize: 42=
[hz._hzInstance_1_jet.jet.cooperative.thread-1] filter: tumbling
[hz._hzInstance_1_jet.jet.cooperative.thread-1] filter: and
[hz._hzInstance_1_jet.jet.cooperative.thread-1] filter: Socket
[hz._hzInstance_1_jet.jet.cooperative.thread-1] filter: to
[hz._hzInstance_1_jet.jet.cooperative.thread-1] filter: be
[hz._hzInstance_1_jet.jet.cooperative.thread-1] filter: Repository
[hz._hzInstance_1_jet.jet.cooperative.thread-1] filter: a
[hz._hzInstance_1_jet.jet.cooperative.thread-1] filter: be
[hz._hzInstance_1_jet.jet.cooperative.thread-1] filter: util
[hz._hzInstance_1_jet.jet.cooperative.thread-1] lower: and

〜省略〜

SortおよびTop 20を取るためのStreamのプランが今度は表示され

情報: [172.19.0.1]:5703 [jet] [0.4] [3.8.2] Start executing job 1: dag
    .vertex("read-map-word-count-map")
    .vertex("sort-398f11eb4594").localParallelism(1)
    .vertex("limit-local-f6f0c1f2e2aa").localParallelism(1)
    .vertex("accumulator").localParallelism(1)
    .vertex("combiner").localParallelism(1)
    .vertex("write-__jet_list_8c8b9854-89a7-4113-90fe-ca5e9f333a08")
    .edge(between("read-map-word-count-map", "sort-398f11eb4594").partitioned(?).distributed())
    .edge(between("sort-398f11eb4594", "limit-local-f6f0c1f2e2aa"))
    .edge(between("limit-local-f6f0c1f2e2aa", "accumulator"))
    .edge(between("accumulator", "combiner").partitioned(?).distributed())
    .edge(between("combiner", "write-__jet_list_8c8b9854-89a7-4113-90fe-ca5e9f333a08"))

最後に結果になります。

情報: [172.19.0.1]:5703 [jet] [0.4] [3.8.2] Execution of job 1 completed in 207ms.
result:
  and=28
  the=25
  of=24
  jet=19
  for=18
  a=17
  is=16
  can=15
  to=15
  hazelcast=13
  as=11
  be=11
  in=10
  with=10
  processors=9
  new=9
  streaming=9
  also=8
  time=8
  stream=7

source map size = 87
word-count-map size = 330

もちろんですが、前回のエントリと結果は同じです。

はじめてのHazelcast Jet(Embedded Distributed Stream API) - CLOVER

では、このDAGまわりをもう少し見ていってみましょう。

DAG(directed acyclic graphs)

ジョブを書いていく際には、まずDAG(directed acyclic graphs)のインスタンスが必要になります。

        DAG dag = new DAG();

このDAGを使って、

  • 入力元のDistributed Mapからデータを取得
  • (一部記号などを削って)ホワイトスペースで単語分割
  • 空文字は対象外
  • 小文字に変換
  • キーでカウント
  • 結果をDistributed Mapに書き出す

というジョブを表現していきます。

DAGのインスタンス構築に、Hazelcast Jetのインスタンスは直接関与しません。ジョブの記述をHazelcast Jetから分離して書けるようにすることが
目的のようです。

DAG - Hazelcast Jet Reference Manual

Vertex

DAGのインスタンスから、ジョブの要素となるVertexを作っていきます。

DAG - Hazelcast Jet Reference Manual

Source/Processor

Vertexは、DAGのインスタンスより、名前とSupplier(DistributedSupplier/ProcessorSupplier/ProcessorMetaSupplierのいずれか)を指定して
作成します。

たとえば、ここでは「source」という名前で、Distributed Map(この名前も「source」)を元にしたProcessorMetaSupplierを与えてVertexを
作成します。

        Vertex source = dag.newVertex("source", Sources.readMap("source"));

各種Supplierの違いは、以下のようです。

  • DistributedSupplier … getメソッドを実装し、Processorを返すSupplier。初期化やクリーンアップは実装できない(getのみ実装する)
  • ProcessorSupllier … クラスタ内のひとつのMember上で動作する、ローカルのパーティション処理に特化した全Processorを、1回の呼び出しで作成する。初期化と終了処理の実装が可能
  • ProcessorMetaSupplier … 1回の呼び出しで全Processorを作成する。Memberのリストが与えられると、MemberAddressに応じたProcessorSupplierを返すFunctionを返すように実装される

というわけで、Processorを得るためのもののようです。ちなみに、この3つはいずれもSerializableです。

Processorというのは、「0以上の入力ストリームからデータを受信し、0以上の出力ストリームにデータを送信する」という役割を
負っているようです。

Processor - Hazelcast Jet Reference Manual

今回は、ProcessorMetaSupplierを使って単語分割するVertexなどを定義しています。

        Vertex tokenize = dag.newVertex(
                "tokenize",
                Processors
                        .flatMap((Map.Entry<Integer, String> e) -> {
                            System.out.printf("[%s] tokenize: %s%n", Thread.currentThread().getName(), e);
                            return Traversers.traverseArray(
                                    e.getValue().replaceAll("[=()\"',.:;”-]", " ").trim().split(" +"));
                        }));

他にも、filterやlowercaseなど。もとのGetting Startedでは、flatMapとfilterは一緒に実行されていましたが。

        Vertex filter = dag.newVertex("filter", Processors.filter((String s) -> {
            System.out.printf("[%s] filter: %s%n", Thread.currentThread().getName(), s);
            return !s.isEmpty();
        }));
        Vertex lower = dag.newVertex("lower", Processors.map((String s) -> {
            System.out.printf("[%s] lower: %s%n", Thread.currentThread().getName(), s);
            return s.toLowerCase();
        }));

どのNodeで実行されているか確認しやすいように、System.out.printlnをちょっと入れています。

AggregationOperation

AggregationOperationは、各データに対して集計関数を実行するための、5つの機能を持ちます。

AggregateOperation - Hazelcast Jet Reference Manual

  • create … 新しいアキュムレータオブジェクトを作成
  • accumulate … データによって、アキュムレータの状態を更新
  • combine … 右側のアキュムレータの内容を、左側のアキュムレータに格納
  • deduct … 右側のアキュムレータの内容は、左側のアキュムレータから取り込まれる(combineをundo)
  • finish … アキュムレータを最終結果に変換

java.util.stream.Collectorによく似たものみたいです。

今回使ったのは、まずはこちら。

        Vertex accumulate = dag.newVertex(
                "accumulate",
                Processors.accumulateByKey(DistributedFunctions.wholeItem(), AggregateOperations.counting())
        );

Processors#accumulateByKeyで、キーごとに集計します。対象のキーは全部(DistributedFunctions#wholeItem)、集計はAggregateOperations#countingで
行います。

なお、AggregateOperations#countingはこういう定義です。
https://github.com/hazelcast/hazelcast-jet/blob/v0.4/hazelcast-jet-core/src/main/java/com/hazelcast/jet/AggregateOperations.java#L59-L67

    @Nonnull
    public static AggregateOperation<Object, LongAccumulator, Long> counting() {
        return AggregateOperation.of(
                LongAccumulator::new,        // create
                (a, item) -> a.addExact(1),  // accumulate
                LongAccumulator::addExact,   // combine
                LongAccumulator::subtract,   // deduct
                LongAccumulator::get         // finish
        );
    }

続いて、個々のNodeの集計結果を合算するアキュムレータを定義します。

        Vertex combine = dag.newVertex("combine",
                Processors.combineByKey(AggregateOperations.counting())
        );

Processors#combineByKeyは、Processors#accumulateByKeyに続いて使用されているように設計されているようです。

それぞれ、こういう定義だったりします。
https://github.com/hazelcast/hazelcast-jet/blob/v0.4/hazelcast-jet-core/src/main/java/com/hazelcast/jet/processor/Processors.java#L238-L244

    @Nonnull
    public static <T, K, A> DistributedSupplier<Processor> accumulateByKey(
            @Nonnull DistributedFunction<? super T, K> getKeyF,
            @Nonnull AggregateOperation<? super T, A, ?> aggregateOperation
    ) {
        return () -> new GroupByKeyP<>(getKeyF, aggregateOperation.withFinish(identity()));
    }

https://github.com/hazelcast/hazelcast-jet/blob/v0.4/hazelcast-jet-core/src/main/java/com/hazelcast/jet/processor/Processors.java#L264-L270

    @Nonnull
    public static <A, R> DistributedSupplier<Processor> combineByKey(
            @Nonnull AggregateOperation<?, A, R> aggregateOperation
    ) {
        return () -> new GroupByKeyP<>(Entry::getKey,
                withCombiningAccumulate(Entry<Object, A>::getValue, aggregateOperation));
    }
Sink

最後は、結果をDistributed Mapに書き込みます。

        Vertex sink = dag.newVertex("sink", Sinks.writeMap("word-count-map"));

WordCountに必要なVertexは、ここまでです。

Edge

Vertexを作成したら、今度はEdgeを指定してグラフを作成する必要があります。

Edgeの説明は、DAGのドキュメントに記載してあります。
DAG - Hazelcast Jet Reference Manual

できあがったものは、こんな感じでした。

        dag.edge(Edge.between(source, tokenize))
                .edge(Edge.between(tokenize, filter))
                .edge(Edge.between(filter, lower))
                .edge(Edge.between(lower, accumulate)
                        .partitioned(DistributedFunctions.wholeItem(), Partitioner.HASH_CODE))
                .edge(Edge.between(accumulate, combine)
                        .distributed()
                        .partitioned(DistributedFunctions.entryKey()))
                .edge(Edge.between(combine, sink));

Edgeは、VertexとProcessorを、序数を指定して別のVertexに接続します。Processorは、データを受け取るとそのデータが入った序数を知ることが
できます。出力側も似たようなものです。Processorはある序数にデータをemitしますが、同じデータをすべての序数にemitすることもあります。
これは典型的なケースであり、いくつかのEdgeでデータストリームを簡単にコピーできます。

betweenを使用すると、両端は序数0で構成されます。別の序数が必要な場合は、from(a, ord1).to(b, ord2)の形式を使用します。

序数には、ギャップが内容に割り当てる必要があります。これは、Vertexが序数0〜NのインバウンドなEdgeを持ち、序数が0〜Mのアウトバウンドな
Edgeを持つことを意味します。

今回の場合は、全部序数が0ですね。

あと、ちょっと変わったものとしてpartitionedとdistributedというものが使われています。

今回の例では、各データがネットワークをまたがないように、各Nodeがローカルに持つデータを対象とするようにしています。

partitionedでは、Partitioning Key(グループキーと同じDistributedFunctions#wholeItem)とPartition IDをキーからどのようにして算出するかを
指定します。ここでは、Object#hashCodeをベースにするPartitioner#HASH_CODEを指定しています。これにより、各キーのオーナーがデータを
扱うようになります、と。

                .edge(Edge.between(lower, accumulate)
                        .partitioned(DistributedFunctions.wholeItem(), Partitioner.HASH_CODE))

partitionedは、データのルーティングポリシーのひとつです。全部でUnicast、Broadcast、Partitioned、All-To-Oneの4つがあるみたいです。
また、Partition IDごとにProcessorができるみたいですよ。

続いて、combine。今回は、こんな感じになりました。

                .edge(Edge.between(accumulate, combine)
                        .distributed()
                        .partitioned(DistributedFunctions.entryKey()))

デフォルトでEdgeはローカルで動作し、distributedを呼び出すことでこの制限が解除されます。とすると、これまでのEdgeはすべてローカルNodeで
動くように指示されていたということですね。

このあたりは、ネットワークトラフィックを抑える目的のためみたいです。

Job

ここまで組み上がったら、Jobを実行します。

        jet.newJob(dag).execute().get();

この結果はvoidなので、特に戻り値を見ることはありません。

Job - Hazelcast Jet Reference Manual

この他、JARをデプロイしたりすることもできるようですが…?

まとめ

Hazelcast Jetを、Stream APIではなくてDAGを使ってJobを組み上げる形で使ってみました。Getting Startedを見ているだけだとなんとなく雰囲気しか
わかりませんでしたが、ドキュメントとにらめっこしているとだんだん意味がわかってきました…。

もうちょっと慣れていきたいですね。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/hazelcast-jet-examples/tree/master/embedded-dag-getting-started