CLOVER🍀

That was when it all began.

はじめてのHazelcast Jet(Embedded Distributed Stream API)

今年の2月に、Hazelcastの機能拡張として、Hazelcast Jetというプロダクトがリリースされました。

Hazelcast Jet - In-Memory Streaming and Fast Batch Processing - High-Performance Stream Processing

Introducing Hazelcast Jet 0.3 | Hazelcast Blog

InfoQで紹介されていたりもします。

Hazelcastがオープンソースのストリーム処理エンジンJetをリリース

この時のHazelcast Jetのバージョンは0.3だったのですが、最近0.4がリリースされました。

Jet 0.4 is Released | Hazelcast Blog

0.3の時から気にはなっていたのですが、ちょっと機能的に欠けているかな?と思っていました。0.4になって、そのあたりも
強化されつつあるので、そろそろ…と思って触ってみることにしました。

Hazelcast Jetとは?

Hazelcast Jetは、Hazelcastをベースにしたストリーム処理エンジンのようです。

Hazelcastがオープンソースのストリーム処理エンジンJetをリリース

Hazelcast上の分散データ構造をソースにしたストリーム処理、分散Stream APIJava標準Stream APIの拡張)、Window処理などが可能です。
まだ数は多くはないですが、Apache KafkaやHDFSなど、いくつかのSource/Sinkも提供しています。

Hazelcast Jetの機能については、こちら。

Hazelcast Jet Features - Hazelcast Jet - High-Performance Stream Processing

まだ全貌ははっきりと把握できていませんが、ちょっとずつ試しつつ慣れていこうかなと思います。

はじめてのHazelcast Jet

それでは、まずはGetting Startedに習ってDistributed Stream APIを試してみたいと思います。

Get Started - Hazelcast - High-Performance Stream Processing

ドキュメントはこちら。

Hazelcast Jet Reference Manual

Hazelcast Jetを使うためのMaven依存関係は、こちら。

        <dependency>
            <groupId>com.hazelcast.jet</groupId>
            <artifactId>hazelcast-jet-core</artifactId>
            <version>0.4</version>
        </dependency>

もしくは、こちらです。

        <dependency>
            <groupId>com.hazelcast.jet</groupId>
            <artifactId>hazelcast-jet</artifactId>
            <version>0.4</version>
        </dependency>

「hazelcast-jet-core」と「hazelcast-jet」の違いですが、「hazelcast-jet-core」の場合、Hazelcast JetおよびHazelcast JetのClient Protorol、
それからHazelcastとHazelcast Clientが依存関係としてそれぞれ追加されます。

「hazelcast-jet」の場合、これらの依存関係がMaven Shade PluginでひとまとまりのJARとして依存関係に追加されます。加えて、ちょっとした
serverパッケージがJARに追加されます。この形態は、Hazelcast Jet 0.4から入ったものです。

状況に合わせて好きな方を選ぶとよいでしょう。今回は「hazelcast-jet-core」を使うことにします。

なお、Hazelcast JetにはEmbeddedのみかClient/Serverかという差は、JARの分割単位としてはありません。Hazelcast自体に両者の概念は
ありますが、Hazelcast Jetの場合は同じJARでEmbeddedかClient/Serverかを選ぶことができます。

今回は、以下のお題でやることにしましょう。

Hazelcast Jetのjava.util.streamの拡張についてのドキュメントは、こちら。

java.util.stream Support for Hazelcast IMDG

WordCount対象のテキストは、該当のページをテキストに落として読み込むことにしました。
※後半は省略

$ head -n 20 src/main/resources/jet-0_4-release.txt 
We are happy to announce the release of Hazelcast Jet 0.4. This is the first major release since our inital version and has many improvements bringing us closer to our final vision for Jet:

Improved streaming support including windowing support with event-time semantics.
Out of the box support for tumbling, sliding and session window aggregations.
New AggregateOperation abstraction with several built-in ones including count, average, sum, min, max and linear regression.
Hazelcast version updated to 3.8.2 and Hazelcast IMDG is now shaded inside hazelcast-jet.jar.
Several built-in diagnostic processors and unit test support for writing custom processors.
Many new code samples including several streaming examples and enrichment and co-group for batch operations.
New sources and sinks including ICache, socket and file.
Windowing Support with Event-Time semantics

The biggest improvement is the addition of tools for using Jet on infinite streams of data. Dealing with streaming data is fundamentally different than batch or micro-batch processing as both input and output is continuous. Most streaming computations also deal with some notion of time where you are interested in how a value changes over time. The typical way to deal with streaming data is to look at it in terms of “windows”, where a window represents a slice of the data stream, usually constrained for a period of time.

Jet 0.4 adds several processors which deal specifically with aggregation of streaming data into windows. Types of windowing supported can be summarised as follows:

Tumbling Windows: Fixed-size, non-overlapping and contiguous intervals. For example a tumbling window of 1 minute would mean that each window is of 1 minute length, are not overlapping, and contiguous to each other.
Sliding Windows: A variation on tumbling windows, sliding windows can have overlapping elements and slide in fixed increments as time advances.
Session Windows: Typically captures a burst of events which are separated by a period of inactivity. This can be used to window events by a user session, for example.
Jet also supports the notion of “event-time” where events can have their own timestamp and can arrive out of order. This is achieved by inserting watermarks into the stream of events which drive the passage of time forward.

プログラムとしては、以下の2つを用意します。

  • とりあえず浮いていてもらう、EmbeddedなHazelcast Jetサーバー
  • Hazelcast Jetクラスタにデータを入れ、WordCountを行う

まずはEmbeddedなHazelcast Jetサーバーから。

こちらは、とても簡単です。
src/main/java/org/littlewings/hazelcast/jet/EmbeddedJetServer.java

package org.littlewings.hazelcast.jet;

import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;

public class EmbeddedJetServer {
    public static void main(String... args) {
        JetInstance jet = Jet.newJetInstance();

        System.console().readLine("> stop enter.");

        jet.shutdown();
        Jet.shutdownAll();
    }
}

Hazelcastを単体で使う場合と、そう変わりません。Enterを打ったら、終了とします。

とりあえず、このサーバーには2 Node浮いていてもらいましょう。

## Node 1
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.jet.EmbeddedJetServer

## Node 2
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.jet.EmbeddedJetServer

いつものHazelcastな感じでクラスタが構成されますが

Members [2] {
	Member [172.18.0.1]:5702 - 406aba82-b077-4dd4-acd4-df9d6cb0398c
	Member [172.18.0.1]:5701 - 58d1349c-81ec-4c7b-a085-4f211b6d4112 this
}

ASCIIアートが出るようになってたりで、ちょっとビックリします。

6 24, 2017 12:00:56 午前 com.hazelcast.jet.impl.JetService
情報: [172.18.0.1]:5701 [jet] [0.4] [3.8.2] 
	o   o   o   o---o o---o o     o---o   o   o---o o-o-o        o o---o o-o-o
	|   |  / \     /  |     |     |      / \  |       |          | |       |  
	o---o o---o   o   o-o   |     o     o---o o---o   |          | o-o     |  
	|   | |   |  /    |     |     |     |   |     |   |      \   | |       |  
	o   o o   o o---o o---o o---o o---o o   o o---o   o       o--o o---o   o   
6 24, 2017 12:00:56 午前 com.hazelcast.jet.impl.JetService
情報: [172.18.0.1]:5701 [jet] [0.4] [3.8.2] Copyright (c) 2008-2017, Hazelcast, Inc. All Rights Reserved.

あとは、WordCountするプログラム。今回は、こんな感じで作成。
src/main/java/org/littlewings/hazelcast/jet/WordCountRunner.java

package org.littlewings.hazelcast.jet;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.function.DistributedComparator;
import com.hazelcast.jet.stream.DistributedCollectors;
import com.hazelcast.jet.stream.IStreamMap;

public class WordCountRunner {
    public static void main(String... args) {
        JetInstance jet = Jet.newJetInstance();

        IStreamMap<Integer, String> map = jet.getMap("source");

        try (InputStream is = WordCountRunner.class.getClassLoader().getResourceAsStream("jet-0_4-release.txt");
             InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8);
             BufferedReader reader = new BufferedReader(isr);
             Stream<String> lines = reader.lines()) {
            AtomicInteger count = new AtomicInteger();
            lines.forEach(line -> map.put(count.incrementAndGet(), line));
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }

        IStreamMap<String, Integer> wordCount =
                map
                        .stream()
                        .map(e -> e.getValue().replaceAll("[=()\"',.:;”-]", " ").trim())
                        .filter(line -> !line.isEmpty())
                        .flatMap(line -> Arrays.stream(line.split(" +")))
                        .map(word -> {
                            String lc = word.toLowerCase();
                            System.out.printf("[%s] %s%n", Thread.currentThread().getName(), lc);
                            return lc;
                        })
                        .collect(DistributedCollectors.toIMap(
                                "word-count-map",
                                word -> word,
                                word -> 1,
                                Integer::sum)
                        );

        List<Map.Entry<String, Integer>> top20 =
                wordCount
                        .stream()
                        .sorted(Map.Entry.<String, Integer>comparingByValue().reversed())
                        .limit(20)
                        .collect(DistributedCollectors.toList());

        System.out.println("result:");
        top20.forEach(e -> System.out.println("  " + e));

        System.out.println();
        System.out.println("source map size = " + jet.getMap("source").size());
        System.out.println("word-count-map size = " + jet.getMap("word-count-map").size());

        jet.shutdown();
        Jet.shutdownAll();
    }
}

少し解説します。

まず、Hazelcast Jetのインスタンスを取得し、IStreamMapを取得します。

        JetInstance jet = Jet.newJetInstance();

        IStreamMap<Integer, String> map = jet.getMap("source");

このIStreamMapには、WordCount対象のデータをロードします。使い方は、"API的には"ふつうのMapと同じで良さそうです。

        try (InputStream is = WordCountRunner.class.getClassLoader().getResourceAsStream("jet-0_4-release.txt");
             InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8);
             BufferedReader reader = new BufferedReader(isr);
             Stream<String> lines = reader.lines()) {
            AtomicInteger count = new AtomicInteger();
            lines.forEach(line -> map.put(count.incrementAndGet(), line));
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }

そして、このIStreamMapから生成したStreamに対して、WordCountしてみます。

        IStreamMap<String, Integer> wordCount =
                map
                        .stream()
                        .map(e -> e.getValue().replaceAll("[=()\"',.:;”-]", " ").trim())
                        .filter(line -> !line.isEmpty())
                        .flatMap(line -> Arrays.stream(line.split(" +")))
                        .map(word -> {
                            String lc = word.toLowerCase();
                            System.out.printf("[%s] %s%n", Thread.currentThread().getName(), lc);
                            return lc;
                        })
                        .collect(DistributedCollectors.toIMap(
                                "word-count-map",
                                word -> word,
                                word -> 1,
                                Integer::sum)
                        );

パッと見、ほぼStream APIなのですが、最後のCollectorだけちょっと変わっています。

                        .collect(DistributedCollectors.toIMap(
                                "word-count-map",
                                word -> word,
                                word -> 1,
                                Integer::sum)
                        );

Collectorがシリアライズされて転送されることになるため、ちょっとトリックが入っています。

なお、その他の「パッと見」は通常のStream APIのように見えますが、インターフェースレベルでシリアライズ可能なFunctionなどを受け取れるように
細工が入ったりしています。

DistributedStream (hazelcast-jet-all 0.4 API)

また、このCollectorでは、最後にIStreamMapに結果を登録します。

                        .collect(DistributedCollectors.toIMap(
                                "word-count-map",
                                word -> word,
                                word -> 1,
                                Integer::sum)
                        );

その時の名前は「word-count-map」としていますが、このメソッドの戻り値はIStreamMapとなり、そのまま使うことができます。
Hazelcast Jet内のIStreamMapとしても作成済みなので、Jetインスタンスから取得することも可能です。もちろん、結果を
IStreamMapではなくて、ふつうのローカルなMapに入れることもできます。

ちなみに、mapについては、各単語を小文字に変換しつつ、この時にクラスタ内で分散実行されていることを確認するために
標準出力にスレッド名などを出力しておきます。

                        .map(word -> {
                            String lc = word.toLowerCase();
                            System.out.printf("[%s] %s%n", Thread.currentThread().getName(), lc);
                            return lc;
                        })

あとは、ソートして上位20件を取得して終了。Comparatorについても、シリアライズするためのトリックが入っています。

        List<Map.Entry<String, Integer>> top20 =
                wordCount
                        .stream()
                        .sorted(Map.Entry.<String, Integer>comparingByValue().reversed())
                        .limit(20)
                        .collect(DistributedCollectors.toList());

        System.out.println("result:");
        top20.forEach(e -> System.out.println("  " + e));

各IStreamMapにも、データが入っていることを確認しておきます。

        System.out.println("source map size = " + jet.getMap("source").size());
        System.out.println("word-count-map size = " + jet.getMap("word-count-map").size());

最後は、シャットダウンしておしまいです。

        jet.shutdown();
        Jet.shutdownAll();

では、準備ができたので実行。

$ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.jet.WordCountRunner

先ほど用意したクラスタに参加して

Members [3] {
	Member [172.18.0.1]:5701 - 4393bbe4-c101-4bcb-9136-e73975d18538
	Member [172.18.0.1]:5702 - 3c7506c7-def9-43a1-bf9f-6fbb49c28ab0
	Member [172.18.0.1]:5703 - a651207f-f1fa-4ec4-8786-39fcade4ce6b this
}

どういうプランで実行されるのかが、表示されます。

情報: [172.18.0.1]:5703 [jet] [0.4] [3.8.2] Start executing job 10000: dag
    .vertex("read-map-source")
    .vertex("transform-42c6002251bb")
    .vertex("merge-local")
    .vertex("merge-distributed")
    .vertex("write-map-word-count-map")
    .edge(between("read-map-source", "transform-42c6002251bb"))
    .edge(between("transform-42c6002251bb", "merge-local").partitioned(?))
    .edge(between("merge-local", "merge-distributed").partitioned(?).distributed())
    .edge(between("merge-distributed", "write-map-word-count-map"))

このあたりはまだ読めないので、おいおいですね。雰囲気はなんとなくわからないでも…ではありますが。

各Nodeで分散実行されていることをがわかり

## Word Count Node
情報: [172.18.0.1]:5703 [jet] [0.4] [3.8.2] Start execution of plan for job 10000 from caller [172.18.0.1]:5703.
[hz._hzInstance_1_jet.jet.cooperative.thread-3] more
[hz._hzInstance_1_jet.jet.cooperative.thread-2] summinglong
[hz._hzInstance_1_jet.jet.cooperative.thread-1] processors
[hz._hzInstance_1_jet.jet.cooperative.thread-0] jet
[hz._hzInstance_1_jet.jet.cooperative.thread-0] 0
[hz._hzInstance_1_jet.jet.cooperative.thread-0] 4
[hz._hzInstance_1_jet.jet.cooperative.thread-0] adds
[hz._hzInstance_1_jet.jet.cooperative.thread-1] map
[hz._hzInstance_1_jet.jet.cooperative.thread-2] maxby
〜省略〜


## Node 2
[hz._hzInstance_1_jet.jet.cooperative.thread-0] hazelcast
[hz._hzInstance_1_jet.jet.cooperative.thread-0] icache
[hz._hzInstance_1_jet.jet.cooperative.thread-0] can
[hz._hzInstance_1_jet.jet.cooperative.thread-0] be
[hz._hzInstance_1_jet.jet.cooperative.thread-0] used
[hz._hzInstance_1_jet.jet.cooperative.thread-0] as
[hz._hzInstance_1_jet.jet.cooperative.thread-0] a
[hz._hzInstance_1_jet.jet.cooperative.thread-0] source
[hz._hzInstance_1_jet.jet.cooperative.thread-0] or
[hz._hzInstance_1_jet.jet.cooperative.thread-0] sink
[hz._hzInstance_1_jet.jet.cooperative.thread-0] and
[hz._hzInstance_1_jet.jet.cooperative.thread-0] it
[hz._hzInstance_1_jet.jet.cooperative.thread-0] can
[hz._hzInstance_1_jet.jet.cooperative.thread-0] also
[hz._hzInstance_1_jet.jet.cooperative.thread-0] be
[hz._hzInstance_1_jet.jet.cooperative.thread-0] used
〜省略〜


## Node 3
[hz._hzInstance_1_jet.jet.cooperative.thread-0] embedded
[hz._hzInstance_1_jet.jet.cooperative.thread-0] hazelcast
[hz._hzInstance_1_jet.jet.cooperative.thread-0] version
[hz._hzInstance_1_jet.jet.cooperative.thread-0] is
[hz._hzInstance_1_jet.jet.cooperative.thread-0] updated
[hz._hzInstance_1_jet.jet.cooperative.thread-0] to
[hz._hzInstance_1_jet.jet.cooperative.thread-0] 3
[hz._hzInstance_1_jet.jet.cooperative.thread-0] 8
[hz._hzInstance_1_jet.jet.cooperative.thread-0] 2
[hz._hzInstance_1_jet.jet.cooperative.thread-0] and
[hz._hzInstance_1_jet.jet.cooperative.thread-2] vertex
[hz._hzInstance_1_jet.jet.cooperative.thread-2] log
[hz._hzInstance_1_jet.jet.cooperative.thread-0] hazelcast
〜省略〜

最後に結果表示。これは、実行したNodeで出力されます。

情報: [172.18.0.1]:5703 [jet] [0.4] [3.8.2] Execution of job 1 completed in 261ms.
result:
  and=28
  the=25
  of=24
  jet=19
  for=18
  a=17
  is=16
  can=15
  to=15
  hazelcast=13
  as=11
  be=11
  with=10
  in=10
  new=9
  streaming=9
  processors=9
  time=8
  also=8
  several=7

source map size = 87
word-count-map size = 330

2つのIStreamMapにデータが入っていることもわかります。

とりあえず、動きましたね、と。

まとめ

はじめてのHazelcast Jetということで、とりあえずEmbedded Modeで分散Stream APIを使ってWord Countしてみました。

これから、ちょっとずつ見ていこうかなと思います。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/hazelcast-jet-examples/tree/master/embedded-getting-started