CLOVER🍀

That was when it all began.

Hazelcast JetではじめてのStreaming

Hazelcast Jetのドキュメントには、2つのGetting Startedが載っています。

Hazelcast Jet 101 - Word Counting Batch Job - Hazelcast Jet Reference Manual

Hazelcast Jet 102 - Trade Monitoring Streaming Job - Hazelcast Jet Reference Manual

少し前に、Word Countの方…Distributed Stream APIを使って遊んでいました。

今回は、Streamingの方を見ていきたいと思います。

まずは、Getting Started(Trade Monitoring Streaming Job)に書かれている内容をざっくりと見ていってみます。

Streaming?

Word CountのGetting Startedでは、あらかじめ登録済みのデータに対してDistributed Stream APIを使用して集計処理を
したものでした。

今回、Streamingとなると、ジョブの組み方はまあ近いのですが、今度はそのStreamが無限Streamとなります。

Sliding Window?

Word Countの例では、入力となるデータを全部集計して結果を出しました。無限Streamの場合は、そうはいかないのでなにか別の方法でデータを
集計する必要があります。

その方法のひとつが、Sliding Windowです。Sliding Windowでは、ある期間で区切った集計結果を算出します。

Types of Windows Supported by Jet - Hazelcast Jet Reference Manual

Time Ordering?

入力データがあったというイベントを監視する際に、イベントの発生時間は通常はStreamに登録されるデータのひとつのフィールドとして
書き込まれます。ただ、その時間どおりにStream内で順序付けされているという保証はありません。

ネットワークの遅延など、様々な理由でバラバラの時間でシステム側に届いたりするでしょう。

これは、Sliding Windowの定義を複雑にします。このようなStreamの場合は、(少なくとも部分的に)データを時間を持ったフィールドで
ソートする必要があります。これは、計算コストが高くなることを意味します。

Watermark?

Time Orderingについての問題を解決するために、Watermarkの概念を導入します。これは、Streamに挿入されたタイムスタンプ付きのアイテムで、
「この時点からこれ以下のタイムスタンプのデータはもうありません」という意味になります。

とはいえ、Watermarkが挿入された後に、Watermarkに反するデータが届くことはあります。その場合は、そのデータは「遅すぎたデータ」として
扱われ、除外されることになります。

「Watermark」という言葉は、2つの意味で使われています。

  • DAG Pipeline内の特定のプロパティとして … Watermarkの現在の値
  • データ項目として … ProcessorがWartermarkを受信したこと

Watermarkは、「イベント時間を示す時間」と考えることができ、ProcessorのWatermarkの値はWartermarkを受け取ると前進します。

Stream Skew?

Streamは、実際には並行して移動するsubstreamのセットです。それぞれのイベント間には時間差が生じることがありますが、システムでは
それをひとつのStreamであるかのごとく扱う必要があります。

この課題は、Watermarkを融合することで解決します。データがPartition化された/分散化されたEdgeを通過する際に、下流Processor
substreamから受信したWatermarkのうち、最小のWatermarkになるようにします。

お題

とまあ、ざっくりと書いてみましたが、ここから先は実際に動かして試してみましょう。

Getting Startedの例もいいのですが、今回はジョブを全部自分で組むことにします。

こんな感じで。

  • ランダムな数字を指定された時間生成し続けるProcessor(1分間、同じ数字を生成し続ける)
  • Watermarkの挿入
  • Local Nodeで集計
  • Nodeをまたいで集計
  • データのフォーマット
  • データのファイル出力

基本的には、Getting Startedからデータの生成部のみを変えた感じですね。

また、サンプルとしてはGetting Startedの他に、こちらも参照しています。
https://github.com/hazelcast/hazelcast-jet-code-samples/tree/0.4-maintenance/streaming/trade-generator/src/main/java/com/hazelcast/jet/sample/tradegenerator
https://github.com/hazelcast/hazelcast-jet-code-samples/blob/0.4-maintenance/streaming/stock-exchange/src/main/java/refman/StockExchangeRefMan.java

準備

Maven依存関係は、こんな感じ。

        <dependency>
            <groupId>com.hazelcast.jet</groupId>
            <artifactId>hazelcast-jet-core</artifactId>
            <version>0.4</version>
        </dependency>

あと、分散実行のために浮いていてもらうNodeは、こんな感じで用意。
src/main/java/org/littlewings/hazelcast/jet/EmbeddedJetServer.java

package org.littlewings.hazelcast.jet;

import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;

public class EmbeddedJetServer {
    public static void main(String... args) {
        JetInstance jet = Jet.newJetInstance();

        System.console().readLine("> stop enter.");

        jet.shutdown();
        Jet.shutdownAll();
    }
}

起動後にEnterを入力すると、即終了します。

Processorを書く

まずは、データを作成するProcessorを書いてみます。
src/main/java/org/littlewings/hazelcast/jet/RandomNumberGenerator.java

package org.littlewings.hazelcast.jet;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import com.hazelcast.jet.AbstractProcessor;
import com.hazelcast.jet.Processor;
import com.hazelcast.jet.function.DistributedSupplier;

public class RandomNumberGenerator extends AbstractProcessor {
    Random random = new Random();
    long continuousTime;
    TimeUnit continuousUnit;
    int limit;
    long startTime;

    public RandomNumberGenerator(long countinuousTime, TimeUnit continuousUnit, int limit, long startTime) {
        this.continuousTime = countinuousTime;
        this.continuousUnit = continuousUnit;
        this.limit = limit;
        this.startTime = startTime;
    }

    public static DistributedSupplier<Processor> source(long continuousTime, TimeUnit continuousUnit, int limit) {
        return () -> new RandomNumberGenerator(continuousTime, continuousUnit, limit, System.nanoTime());
    }

    @Override
    protected void init(Context context) throws Exception {
    }

    @Override
    public boolean complete() {
        System.out.printf("[%s] ===== complete =====%n", Thread.currentThread().getName());

        long sleepMillis = 1000L;
        int loopCount = (int) (60 * 1000 / sleepMillis);

        int n = random.nextInt(limit);

        IntStream
                .rangeClosed(1, loopCount)
                .forEach(i -> {
                    System.out.printf("[%s] genrated = %d%n", Thread.currentThread().getName(), n);

                    emit(new Num(n, LocalDateTime.now().atZone(ZoneOffset.systemDefault()).toInstant().toEpochMilli()));

                    try {
                        TimeUnit.MILLISECONDS.sleep(sleepMillis);
                    } catch (InterruptedException e) {
                        // ignoe
                    }
                });


        long elapsedTime = System.nanoTime() - startTime;

        System.out.printf("[%s] start: %d%n", Thread.currentThread().getName(), startTime);
        System.out.printf("[%s] rap: %s%n",
                Thread.currentThread().getName(),
                continuousUnit.convert(elapsedTime, TimeUnit.NANOSECONDS));

        return continuousUnit.convert(elapsedTime, TimeUnit.NANOSECONDS) >= continuousTime;
    }

    @Override
    public boolean isCooperative() {
        return false;
    }
}

生成したデータは、Numというクラスに入れて保持することにします。
src/main/java/org/littlewings/hazelcast/jet/Num.java

package org.littlewings.hazelcast.jet;

import java.io.Serializable;

public class Num implements Serializable {
    private static final long serialVersionUID = 1L;

    int n;
    long time;

    public Num(int n, long time) {
        this.n = n;
        this.time = time;
    }

    public int getN() {
        return n;
    }

    public long getTime() {
        return time;
    }
}

どうも、時間を表すlong型のフィールドを持っている必要があるようです。

Processorの方に戻りましょう。

Processorは、サンプルとこちらのドキュメントを見ながら作成。
Processor - Hazelcast Jet Reference Manual

Processorでは初期化処理を行うことができますが、今回は特になにも行っていません。

    @Override
    protected void init(Context context) throws Exception {
    }

どうも、Processorへの呼び出しサイクルの度に実行されている感じがします。本当に固定値の初期化をしたい場合は、コンストラクタで渡す方が
無難かもしれません。

complete。ここでfalseを返すと、Hazelcast Jetから継続して呼び出しが行われます。trueを返すと、終了します。

    @Override
    public boolean complete() {
        System.out.printf("[%s] ===== complete =====%n", Thread.currentThread().getName());

        long sleepMillis = 1000L;
        int loopCount = (int) (60 * 1000 / sleepMillis);

        int n = random.nextInt(limit);

        IntStream
                .rangeClosed(1, loopCount)
                .forEach(i -> {
                    System.out.printf("[%s] genrated = %d%n", Thread.currentThread().getName(), n);

                    emit(new Num(n, LocalDateTime.now().atZone(ZoneOffset.systemDefault()).toInstant().toEpochMilli()));

                    try {
                        TimeUnit.MILLISECONDS.sleep(sleepMillis);
                    } catch (InterruptedException e) {
                        // ignoe
                    }
                });


        long elapsedTime = System.nanoTime() - startTime;

        System.out.printf("[%s] start: %d%n", Thread.currentThread().getName(), startTime);
        System.out.printf("[%s] rap: %s%n",
                Thread.currentThread().getName(),
                continuousUnit.convert(elapsedTime, TimeUnit.NANOSECONDS));

        return continuousUnit.convert(elapsedTime, TimeUnit.NANOSECONDS) >= continuousTime;
    }

今回は、最初にランダムな数字を取得してから1分間その値を出力し続け、コンストラクタで制限として与えた時間を経過していた場合は
trueを、そうでない場合はfalseを返すように作成しました。

また、一気に連続してデータを作られても微妙なので、1秒スリープさせるようにしています。データの生成個数は、1分間の間のスリープ時間から
算出します。

        long sleepMillis = 1000L;
        int loopCount = (int) (60 * 1000 / sleepMillis);

isCooperativeメソッドについては、falseを返すと専用のメソッドをProcessorに割り当ててくれるみたいです。

    @Override
    public boolean isCooperative() {
        return false;
    }

ジョブ定義

あとは、これらを使うmainのジョブ定義。
src/main/java/org/littlewings/hazelcast/jet/StreamingRunner.java

package org.littlewings.hazelcast.jet;

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import com.hazelcast.jet.*;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.processor.Processors;
import com.hazelcast.jet.processor.Sinks;

public class StreamingRunner {
    public static void main(String... args) throws ExecutionException, InterruptedException {
        JetInstance jet = Jet.newJetInstance();

        try {
            WindowDefinition windowDef =
                    WindowDefinition.slidingWindowDef(30_000L, 10_000L);
            //WindowDefinition.tumblingWindowDef(30_000L);

            DAG dag = new DAG();
            Vertex source =
                    dag.newVertex("source",
                            RandomNumberGenerator.source(3L, TimeUnit.MINUTES, 100));
            Vertex insertWatermarks =
                    dag.newVertex("insert-watermarks", Processors.insertWatermarks(Num::getTime, WatermarkPolicies.withFixedLag(1000), WatermarkEmissionPolicy.emitByFrame(windowDef)));
            Vertex summing1 =
                    dag.newVertex("summing1", Processors.accumulateByFrame(Num::getN, Num::getTime, TimestampKind.EVENT, windowDef, AggregateOperations.summingLong(Num::getN)));
            Vertex summing2 =
                    dag.newVertex("summing2", Processors.combineToSlidingWindow(windowDef, AggregateOperations.summingLong(Num::getN)));
            Vertex format =
                    dag.newVertex("format", formatOutput());
            Vertex sink =
                    dag.newVertex("sink", Sinks.writeFile("target/jet"));


            dag
                    .edge(Edge.between(source, insertWatermarks))
                    .edge(Edge.between(insertWatermarks, summing1)
                            .partitioned(Num::getN, Partitioner.HASH_CODE))
                    .edge(Edge.between(summing1, summing2)
                            .partitioned(Map.Entry<Long, Long>::getKey, Partitioner.HASH_CODE)
                            .distributed())
                    .edge(Edge.between(summing2, format).isolated())
                    .edge(Edge.between(format, sink).isolated());


            Future<Void> future = jet.newJob(dag).execute();
            future.get();
        } finally {
            jet.shutdown();
            Jet.shutdownAll();
        }
    }

    static DistributedSupplier<Processor> formatOutput() {
        return () -> {
            DateTimeFormatter timeFormat = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
            return Processors.map((TimestampedEntry<Long, Long> f) -> {
                String record = String.format("%s %5s %4d",
                    timeFormat.format(Instant.ofEpochMilli(f.getTimestamp()).atZone(ZoneId.systemDefault())),
                    f.getKey(), f.getValue());
                System.out.printf("[%s] record = %s%n", Thread.currentThread().getName(), record);
                return record;
            }).get();
        };
    }

}

基本的なジョブ定義の方法は、有限ソースのStreamを使った時とそう変わらないのですが、いくつか異なる概念が入ってきています。

まずはVertexの構成から。

最初は、Sliding Windowの定義。

            WindowDefinition windowDef =
                    WindowDefinition.slidingWindowDef(30_000L, 10_000L);
            //WindowDefinition.tumblingWindowDef(30_000L);

Sliding Windowについては、あらためてですが、こちらを。
Types of Windows Supported by Jet - Hazelcast Jet Reference Manual

「Sliding and Tumbling Window」として、説明が書かれています。

Sliding Windowは、Window SizeとStep Sizeを定義して、Windows Sizeの幅をStep Sizeの幅分ずらしながら進んでいくものになります。

オフィシャルドキュメントの図では、30秒のWindows Sizeと10秒のStep Sizeの例を出しています。

なお、Tumbling Windowは、Window SizeとStep Sizeが同じ大きさのSliding Windowのことを指します。

で、今回はドキュメントと同様Window Sizeを30秒、Step Sizeを10秒のSliding Window定義としました。

Streamの元データは、最初に作ったProcessorを使って定義します。今回は、3分間データを出力し続けます。

            DAG dag = new DAG();
            Vertex source =
                    dag.newVertex("source",
                            RandomNumberGenerator.source(3L, TimeUnit.MINUTES, 100));

次に、Watermarkを追加します。

            Vertex insertWatermarks =
                    dag.newVertex("insert-watermarks", Processors.insertWatermarks(Num::getTime, WatermarkPolicies.withFixedLag(1000), WatermarkEmissionPolicy.emitByFrame(windowDef)));

入力データの中から時間を表すlong値、WatermarkPolicy、WatermarkEmissionPolicyを指定して定義します。

WatermarkPolicyについては、こちら。
WatermarkPolicy - Hazelcast Jet Reference Manual

WatermarkPolicyにはいくつかパターンがありますが、今回は単純に固定のタイミングで切ってしまう「With Fixed Lag」なPolicyを使用します。

また、WatermarkEmissionPolicy#emitByFrameで指定されたWindowDefinitionに従い、Watermarkが挿入された後は前のWatermarkよりも高いフレームに
属することを保証するように定義します。

続いて、データを集約します。最初に、Local Nodeで集約し、続いてNode間で集約を行います。集約する値はNum#getNで、時間はNum#getTimeで行います。

            Vertex summing1 =
                    dag.newVertex("summing1", Processors.accumulateByFrame(Num::getN, Num::getTime, TimestampKind.EVENT, windowDef, AggregateOperations.summingLong(Num::getN)));
            Vertex summing2 =
                    dag.newVertex("summing2", Processors.combineToSlidingWindow(windowDef, AggregateOperations.summingLong(Num::getN)));

最後は、フォーマットして結果をファイルに出力。

            Vertex format =
                    dag.newVertex("format", formatOutput());
            Vertex sink =
                    dag.newVertex("sink", Sinks.writeFile("target/jet"));

フォーマットするProcessorは、サンプルのものをほぼそのまま持ってきました。

    static DistributedSupplier<Processor> formatOutput() {
        return () -> {
            DateTimeFormatter timeFormat = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
            return Processors.map((TimestampedEntry<Long, Long> f) -> {
                String record = String.format("%s %5s %4d",
                    timeFormat.format(Instant.ofEpochMilli(f.getTimestamp()).atZone(ZoneId.systemDefault())),
                    f.getKey(), f.getValue());
                System.out.printf("[%s] record = %s%n", Thread.currentThread().getName(), record);
                return record;
            }).get();
        };
    }

そして、DAGを構成します。

            dag
                    .edge(Edge.between(source, insertWatermarks))
                    .edge(Edge.between(insertWatermarks, summing1)
                            .partitioned(Num::getN, Partitioner.HASH_CODE))
                    .edge(Edge.between(summing1, summing2)
                            .partitioned(Map.Entry<Long, Long>::getKey, Partitioner.HASH_CODE)
                            .distributed())
                    .edge(Edge.between(summing2, format).isolated())
                    .edge(Edge.between(format, sink).isolated());

summing1はLocalでPartitioned、summing2はDistributedでPartitionedです。summing2になった時には、TimestampedEntryというMap.Entryのサブクラスが
渡ってきます。前段のPartitionedのキーと合わせるようにしておけばOKです。

Edge#isolatedというのは、EdgeのRoutingPolicyのうち、特定のDownstreamのProcessorがひとつのUpstream Processorからデータを受け取ることを
保証するものです。似たものにUNICASTがあるのですが、こちらはBack Pressureの設定が可能なようです。

で、実行。

            Future<Void> future = jet.newJob(dag).execute();
            future.get();

確認

出力結果はローカルディレクトリに出力するようにしてしまったので、各Nodeの実行ディレクトリを分けようと思います。

構成は、浮いているNode×2、ジョブのエントリポイントとなるNode×1です。

依存ライブラリをコピーして、パッケージング。

$ mvn dependency:copy-dependencies -DoutputDirectory=lib
$ mvn package

実行ディレクトリを作成。

$ mkdir runner server1 server2

この中で、それぞれプログラムを起動します。

## Node 1
$ cd server1
$ java -cp '../lib/*:../target/embedded-streaming-getting-started-0.0.1-SNAPSHOT.jar' org.littlewings.hazelcast.jet.EmbeddedJetServer


## Node 2
$ cd server2
$ java -cp '../lib/*:../target/embedded-streaming-getting-started-0.0.1-SNAPSHOT.jar' org.littlewings.hazelcast.jet.EmbeddedJetServer


## Node 3(Job)
$ cd runner
$ java -cp '../lib/*:../target/embedded-streaming-getting-started-0.0.1-SNAPSHOT.jar' org.littlewings.hazelcast.jet.StreamingRunner

ジョブは、3分後に終了します。

途中、こんなログが出てくると思うので、様子を眺めておきましょう。

## データ作成
[hz._hzInstance_1_jet.jet.blocking.thread-3] genrated = 51
[hz._hzInstance_1_jet.jet.blocking.thread-5] genrated = 54
[hz._hzInstance_1_jet.jet.blocking.thread-4] genrated = 80
[hz._hzInstance_1_jet.jet.blocking.thread-7] genrated = 62
[hz._hzInstance_1_jet.jet.blocking.thread-0] genrated = 39
[hz._hzInstance_1_jet.jet.blocking.thread-1] genrated = 53
[hz._hzInstance_1_jet.jet.blocking.thread-6] genrated = 25
[hz._hzInstance_1_jet.jet.blocking.thread-6] genrated = 25
[hz._hzInstance_1_jet.jet.blocking.thread-5] genrated = 54
[hz._hzInstance_1_jet.jet.blocking.thread-2] genrated = 56


## フォーマット
[hz._hzInstance_1_jet.jet.cooperative.thread-0] record = 23:12:30.000    37  148
[hz._hzInstance_1_jet.jet.cooperative.thread-0] record = 23:12:30.000    25  200
[hz._hzInstance_1_jet.jet.cooperative.thread-3] record = 23:12:30.000    29  116
[hz._hzInstance_1_jet.jet.cooperative.thread-2] record = 23:12:30.000     1    4
[hz._hzInstance_1_jet.jet.cooperative.thread-6] record = 23:12:30.000    19   76
[hz._hzInstance_1_jet.jet.cooperative.thread-6] record = 23:12:30.000    62  496

ちなみに、同一Nodeから並行してデータ出力が行われているように見えるかもしれませんが、これはVertex#localParallelismを調整
しなかったからですね…。Getting Startedでは、1にしているようなのでお好みで。

で、結果確認。

例えば、出力されたデータが「87」の場合の結果。

$ grep -r ' 87 ' runner/target/ server*/target
server2/target/jet/8:23:12:30.000    87  348
server2/target/jet/8:23:12:40.000    87 1218
server2/target/jet/8:23:12:50.000    87 2088
server2/target/jet/8:23:13:00.000    87 2610
server2/target/jet/8:23:13:10.000    87 2610
server2/target/jet/8:23:13:20.000    87 2610
server2/target/jet/8:23:13:30.000    87 2610
server2/target/jet/8:23:13:40.000    87 2610
server2/target/jet/8:23:13:50.000    87 2610
server2/target/jet/8:23:14:00.000    87 2610
server2/target/jet/8:23:14:10.000    87 2610
server2/target/jet/8:23:14:20.000    87 2610
server2/target/jet/8:23:14:30.000    87 2262
server2/target/jet/8:23:14:40.000    87 1392
server2/target/jet/8:23:14:50.000    87  522

87×10は870、Windows Sizeは30なので、2,670のところはデータが期間分フルで揃っているところですね。最初と最後で徐々に数字が少なくなっているのは
開始と終了のタイミングがあるのでデータが減るからですね。

他のデータについて確認しても、似たような結果になります。

また、今回はSliding WindowでやってみましたがTumbling Windowも試してみましょう。

            WindowDefinition windowDef =
                    // WindowDefinition.slidingWindowDef(30_000L, 10_000L);
                    WindowDefinition.tumblingWindowDef(30_000L);

結果。

$ grep -r ' 20 ' runner/target/ server*/target
runner/target/jet/1:23:21:30.000    20   40
runner/target/jet/1:23:22:00.000    20  600
runner/target/jet/1:23:22:30.000    20  600
runner/target/jet/1:23:23:00.000    20  600
runner/target/jet/1:23:23:30.000    20  560

20×30で、600ですね。

とりあえず、OKそうな。

まとめ

Hazelcast Jetを使って、StreamとSliding Windowの初歩を試してみました。

Stream処理に全然慣れていないので、用語や概念を覚えるところからだったりしますが、ゆっくりやっていこうと思います。

今回作成したソースコードは、こちらに置いています。
https://github.com/kazuhira-r/hazelcast-jet-examples/tree/master/embedded-streaming-getting-started