Hazelcast Jetのドキュメントには、2つのGetting Startedが載っています。
Hazelcast Jet 101 - Word Counting Batch Job - Hazelcast Jet Reference Manual
Hazelcast Jet 102 - Trade Monitoring Streaming Job - Hazelcast Jet Reference Manual
少し前に、Word Countの方…Distributed Stream APIを使って遊んでいました。
今回は、Streamingの方を見ていきたいと思います。
まずは、Getting Started(Trade Monitoring Streaming Job)に書かれている内容をざっくりと見ていってみます。
Streaming?
Word CountのGetting Startedでは、あらかじめ登録済みのデータに対してDistributed Stream APIを使用して集計処理を
したものでした。
今回、Streamingとなると、ジョブの組み方はまあ近いのですが、今度はそのStreamが無限Streamとなります。
Sliding Window?
Word Countの例では、入力となるデータを全部集計して結果を出しました。無限Streamの場合は、そうはいかないのでなにか別の方法でデータを
集計する必要があります。
その方法のひとつが、Sliding Windowです。Sliding Windowでは、ある期間で区切った集計結果を算出します。
Types of Windows Supported by Jet - Hazelcast Jet Reference Manual
Time Ordering?
入力データがあったというイベントを監視する際に、イベントの発生時間は通常はStreamに登録されるデータのひとつのフィールドとして
書き込まれます。ただ、その時間どおりにStream内で順序付けされているという保証はありません。
ネットワークの遅延など、様々な理由でバラバラの時間でシステム側に届いたりするでしょう。
これは、Sliding Windowの定義を複雑にします。このようなStreamの場合は、(少なくとも部分的に)データを時間を持ったフィールドで
ソートする必要があります。これは、計算コストが高くなることを意味します。
Watermark?
Time Orderingについての問題を解決するために、Watermarkの概念を導入します。これは、Streamに挿入されたタイムスタンプ付きのアイテムで、
「この時点からこれ以下のタイムスタンプのデータはもうありません」という意味になります。
とはいえ、Watermarkが挿入された後に、Watermarkに反するデータが届くことはあります。その場合は、そのデータは「遅すぎたデータ」として
扱われ、除外されることになります。
「Watermark」という言葉は、2つの意味で使われています。
- DAG Pipeline内の特定のプロパティとして … Watermarkの現在の値
- データ項目として … ProcessorがWartermarkを受信したこと
Watermarkは、「イベント時間を示す時間」と考えることができ、ProcessorのWatermarkの値はWartermarkを受け取ると前進します。
お題
とまあ、ざっくりと書いてみましたが、ここから先は実際に動かして試してみましょう。
Getting Startedの例もいいのですが、今回はジョブを全部自分で組むことにします。
こんな感じで。
- ランダムな数字を指定された時間生成し続けるProcessor(1分間、同じ数字を生成し続ける)
- Watermarkの挿入
- Local Nodeで集計
- Nodeをまたいで集計
- データのフォーマット
- データのファイル出力
基本的には、Getting Startedからデータの生成部のみを変えた感じですね。
また、サンプルとしてはGetting Startedの他に、こちらも参照しています。
https://github.com/hazelcast/hazelcast-jet-code-samples/tree/0.4-maintenance/streaming/trade-generator/src/main/java/com/hazelcast/jet/sample/tradegenerator
https://github.com/hazelcast/hazelcast-jet-code-samples/blob/0.4-maintenance/streaming/stock-exchange/src/main/java/refman/StockExchangeRefMan.java
準備
Maven依存関係は、こんな感じ。
<dependency> <groupId>com.hazelcast.jet</groupId> <artifactId>hazelcast-jet-core</artifactId> <version>0.4</version> </dependency>
あと、分散実行のために浮いていてもらうNodeは、こんな感じで用意。
src/main/java/org/littlewings/hazelcast/jet/EmbeddedJetServer.java
package org.littlewings.hazelcast.jet; import com.hazelcast.jet.Jet; import com.hazelcast.jet.JetInstance; public class EmbeddedJetServer { public static void main(String... args) { JetInstance jet = Jet.newJetInstance(); System.console().readLine("> stop enter."); jet.shutdown(); Jet.shutdownAll(); } }
起動後にEnterを入力すると、即終了します。
Processorを書く
まずは、データを作成するProcessorを書いてみます。
src/main/java/org/littlewings/hazelcast/jet/RandomNumberGenerator.java
package org.littlewings.hazelcast.jet; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import com.hazelcast.jet.AbstractProcessor; import com.hazelcast.jet.Processor; import com.hazelcast.jet.function.DistributedSupplier; public class RandomNumberGenerator extends AbstractProcessor { Random random = new Random(); long continuousTime; TimeUnit continuousUnit; int limit; long startTime; public RandomNumberGenerator(long countinuousTime, TimeUnit continuousUnit, int limit, long startTime) { this.continuousTime = countinuousTime; this.continuousUnit = continuousUnit; this.limit = limit; this.startTime = startTime; } public static DistributedSupplier<Processor> source(long continuousTime, TimeUnit continuousUnit, int limit) { return () -> new RandomNumberGenerator(continuousTime, continuousUnit, limit, System.nanoTime()); } @Override protected void init(Context context) throws Exception { } @Override public boolean complete() { System.out.printf("[%s] ===== complete =====%n", Thread.currentThread().getName()); long sleepMillis = 1000L; int loopCount = (int) (60 * 1000 / sleepMillis); int n = random.nextInt(limit); IntStream .rangeClosed(1, loopCount) .forEach(i -> { System.out.printf("[%s] genrated = %d%n", Thread.currentThread().getName(), n); emit(new Num(n, LocalDateTime.now().atZone(ZoneOffset.systemDefault()).toInstant().toEpochMilli())); try { TimeUnit.MILLISECONDS.sleep(sleepMillis); } catch (InterruptedException e) { // ignoe } }); long elapsedTime = System.nanoTime() - startTime; System.out.printf("[%s] start: %d%n", Thread.currentThread().getName(), startTime); System.out.printf("[%s] rap: %s%n", Thread.currentThread().getName(), continuousUnit.convert(elapsedTime, TimeUnit.NANOSECONDS)); return continuousUnit.convert(elapsedTime, TimeUnit.NANOSECONDS) >= continuousTime; } @Override public boolean isCooperative() { return false; } }
生成したデータは、Numというクラスに入れて保持することにします。
src/main/java/org/littlewings/hazelcast/jet/Num.java
package org.littlewings.hazelcast.jet; import java.io.Serializable; public class Num implements Serializable { private static final long serialVersionUID = 1L; int n; long time; public Num(int n, long time) { this.n = n; this.time = time; } public int getN() { return n; } public long getTime() { return time; } }
どうも、時間を表すlong型のフィールドを持っている必要があるようです。
Processorの方に戻りましょう。
Processorは、サンプルとこちらのドキュメントを見ながら作成。
Processor - Hazelcast Jet Reference Manual
Processorでは初期化処理を行うことができますが、今回は特になにも行っていません。
@Override protected void init(Context context) throws Exception { }
どうも、Processorへの呼び出しサイクルの度に実行されている感じがします。本当に固定値の初期化をしたい場合は、コンストラクタで渡す方が
無難かもしれません。
complete。ここでfalseを返すと、Hazelcast Jetから継続して呼び出しが行われます。trueを返すと、終了します。
@Override public boolean complete() { System.out.printf("[%s] ===== complete =====%n", Thread.currentThread().getName()); long sleepMillis = 1000L; int loopCount = (int) (60 * 1000 / sleepMillis); int n = random.nextInt(limit); IntStream .rangeClosed(1, loopCount) .forEach(i -> { System.out.printf("[%s] genrated = %d%n", Thread.currentThread().getName(), n); emit(new Num(n, LocalDateTime.now().atZone(ZoneOffset.systemDefault()).toInstant().toEpochMilli())); try { TimeUnit.MILLISECONDS.sleep(sleepMillis); } catch (InterruptedException e) { // ignoe } }); long elapsedTime = System.nanoTime() - startTime; System.out.printf("[%s] start: %d%n", Thread.currentThread().getName(), startTime); System.out.printf("[%s] rap: %s%n", Thread.currentThread().getName(), continuousUnit.convert(elapsedTime, TimeUnit.NANOSECONDS)); return continuousUnit.convert(elapsedTime, TimeUnit.NANOSECONDS) >= continuousTime; }
今回は、最初にランダムな数字を取得してから1分間その値を出力し続け、コンストラクタで制限として与えた時間を経過していた場合は
trueを、そうでない場合はfalseを返すように作成しました。
また、一気に連続してデータを作られても微妙なので、1秒スリープさせるようにしています。データの生成個数は、1分間の間のスリープ時間から
算出します。
long sleepMillis = 1000L; int loopCount = (int) (60 * 1000 / sleepMillis);
isCooperativeメソッドについては、falseを返すと専用のメソッドをProcessorに割り当ててくれるみたいです。
@Override public boolean isCooperative() { return false; }
ジョブ定義
あとは、これらを使うmainのジョブ定義。
src/main/java/org/littlewings/hazelcast/jet/StreamingRunner.java
package org.littlewings.hazelcast.jet; import java.time.Instant; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import com.hazelcast.jet.*; import com.hazelcast.jet.function.DistributedSupplier; import com.hazelcast.jet.processor.Processors; import com.hazelcast.jet.processor.Sinks; public class StreamingRunner { public static void main(String... args) throws ExecutionException, InterruptedException { JetInstance jet = Jet.newJetInstance(); try { WindowDefinition windowDef = WindowDefinition.slidingWindowDef(30_000L, 10_000L); //WindowDefinition.tumblingWindowDef(30_000L); DAG dag = new DAG(); Vertex source = dag.newVertex("source", RandomNumberGenerator.source(3L, TimeUnit.MINUTES, 100)); Vertex insertWatermarks = dag.newVertex("insert-watermarks", Processors.insertWatermarks(Num::getTime, WatermarkPolicies.withFixedLag(1000), WatermarkEmissionPolicy.emitByFrame(windowDef))); Vertex summing1 = dag.newVertex("summing1", Processors.accumulateByFrame(Num::getN, Num::getTime, TimestampKind.EVENT, windowDef, AggregateOperations.summingLong(Num::getN))); Vertex summing2 = dag.newVertex("summing2", Processors.combineToSlidingWindow(windowDef, AggregateOperations.summingLong(Num::getN))); Vertex format = dag.newVertex("format", formatOutput()); Vertex sink = dag.newVertex("sink", Sinks.writeFile("target/jet")); dag .edge(Edge.between(source, insertWatermarks)) .edge(Edge.between(insertWatermarks, summing1) .partitioned(Num::getN, Partitioner.HASH_CODE)) .edge(Edge.between(summing1, summing2) .partitioned(Map.Entry<Long, Long>::getKey, Partitioner.HASH_CODE) .distributed()) .edge(Edge.between(summing2, format).isolated()) .edge(Edge.between(format, sink).isolated()); Future<Void> future = jet.newJob(dag).execute(); future.get(); } finally { jet.shutdown(); Jet.shutdownAll(); } } static DistributedSupplier<Processor> formatOutput() { return () -> { DateTimeFormatter timeFormat = DateTimeFormatter.ofPattern("HH:mm:ss.SSS"); return Processors.map((TimestampedEntry<Long, Long> f) -> { String record = String.format("%s %5s %4d", timeFormat.format(Instant.ofEpochMilli(f.getTimestamp()).atZone(ZoneId.systemDefault())), f.getKey(), f.getValue()); System.out.printf("[%s] record = %s%n", Thread.currentThread().getName(), record); return record; }).get(); }; } }
基本的なジョブ定義の方法は、有限ソースのStreamを使った時とそう変わらないのですが、いくつか異なる概念が入ってきています。
まずはVertexの構成から。
最初は、Sliding Windowの定義。
WindowDefinition windowDef =
WindowDefinition.slidingWindowDef(30_000L, 10_000L);
//WindowDefinition.tumblingWindowDef(30_000L);
Sliding Windowについては、あらためてですが、こちらを。
Types of Windows Supported by Jet - Hazelcast Jet Reference Manual
「Sliding and Tumbling Window」として、説明が書かれています。
Sliding Windowは、Window SizeとStep Sizeを定義して、Windows Sizeの幅をStep Sizeの幅分ずらしながら進んでいくものになります。
オフィシャルドキュメントの図では、30秒のWindows Sizeと10秒のStep Sizeの例を出しています。
なお、Tumbling Windowは、Window SizeとStep Sizeが同じ大きさのSliding Windowのことを指します。
で、今回はドキュメントと同様Window Sizeを30秒、Step Sizeを10秒のSliding Window定義としました。
Streamの元データは、最初に作ったProcessorを使って定義します。今回は、3分間データを出力し続けます。
DAG dag = new DAG(); Vertex source = dag.newVertex("source", RandomNumberGenerator.source(3L, TimeUnit.MINUTES, 100));
次に、Watermarkを追加します。
Vertex insertWatermarks = dag.newVertex("insert-watermarks", Processors.insertWatermarks(Num::getTime, WatermarkPolicies.withFixedLag(1000), WatermarkEmissionPolicy.emitByFrame(windowDef)));
入力データの中から時間を表すlong値、WatermarkPolicy、WatermarkEmissionPolicyを指定して定義します。
WatermarkPolicyについては、こちら。
WatermarkPolicy - Hazelcast Jet Reference Manual
WatermarkPolicyにはいくつかパターンがありますが、今回は単純に固定のタイミングで切ってしまう「With Fixed Lag」なPolicyを使用します。
また、WatermarkEmissionPolicy#emitByFrameで指定されたWindowDefinitionに従い、Watermarkが挿入された後は前のWatermarkよりも高いフレームに
属することを保証するように定義します。
続いて、データを集約します。最初に、Local Nodeで集約し、続いてNode間で集約を行います。集約する値はNum#getNで、時間はNum#getTimeで行います。
Vertex summing1 = dag.newVertex("summing1", Processors.accumulateByFrame(Num::getN, Num::getTime, TimestampKind.EVENT, windowDef, AggregateOperations.summingLong(Num::getN))); Vertex summing2 = dag.newVertex("summing2", Processors.combineToSlidingWindow(windowDef, AggregateOperations.summingLong(Num::getN)));
最後は、フォーマットして結果をファイルに出力。
Vertex format = dag.newVertex("format", formatOutput()); Vertex sink = dag.newVertex("sink", Sinks.writeFile("target/jet"));
フォーマットするProcessorは、サンプルのものをほぼそのまま持ってきました。
static DistributedSupplier<Processor> formatOutput() { return () -> { DateTimeFormatter timeFormat = DateTimeFormatter.ofPattern("HH:mm:ss.SSS"); return Processors.map((TimestampedEntry<Long, Long> f) -> { String record = String.format("%s %5s %4d", timeFormat.format(Instant.ofEpochMilli(f.getTimestamp()).atZone(ZoneId.systemDefault())), f.getKey(), f.getValue()); System.out.printf("[%s] record = %s%n", Thread.currentThread().getName(), record); return record; }).get(); }; }
そして、DAGを構成します。
dag .edge(Edge.between(source, insertWatermarks)) .edge(Edge.between(insertWatermarks, summing1) .partitioned(Num::getN, Partitioner.HASH_CODE)) .edge(Edge.between(summing1, summing2) .partitioned(Map.Entry<Long, Long>::getKey, Partitioner.HASH_CODE) .distributed()) .edge(Edge.between(summing2, format).isolated()) .edge(Edge.between(format, sink).isolated());
summing1はLocalでPartitioned、summing2はDistributedでPartitionedです。summing2になった時には、TimestampedEntryというMap.Entryのサブクラスが
渡ってきます。前段のPartitionedのキーと合わせるようにしておけばOKです。
Edge#isolatedというのは、EdgeのRoutingPolicyのうち、特定のDownstreamのProcessorがひとつのUpstream Processorからデータを受け取ることを
保証するものです。似たものにUNICASTがあるのですが、こちらはBack Pressureの設定が可能なようです。
で、実行。
Future<Void> future = jet.newJob(dag).execute(); future.get();
確認
出力結果はローカルディレクトリに出力するようにしてしまったので、各Nodeの実行ディレクトリを分けようと思います。
構成は、浮いているNode×2、ジョブのエントリポイントとなるNode×1です。
依存ライブラリをコピーして、パッケージング。
$ mvn dependency:copy-dependencies -DoutputDirectory=lib $ mvn package
実行ディレクトリを作成。
$ mkdir runner server1 server2
この中で、それぞれプログラムを起動します。
## Node 1 $ cd server1 $ java -cp '../lib/*:../target/embedded-streaming-getting-started-0.0.1-SNAPSHOT.jar' org.littlewings.hazelcast.jet.EmbeddedJetServer ## Node 2 $ cd server2 $ java -cp '../lib/*:../target/embedded-streaming-getting-started-0.0.1-SNAPSHOT.jar' org.littlewings.hazelcast.jet.EmbeddedJetServer ## Node 3(Job) $ cd runner $ java -cp '../lib/*:../target/embedded-streaming-getting-started-0.0.1-SNAPSHOT.jar' org.littlewings.hazelcast.jet.StreamingRunner
ジョブは、3分後に終了します。
途中、こんなログが出てくると思うので、様子を眺めておきましょう。
## データ作成 [hz._hzInstance_1_jet.jet.blocking.thread-3] genrated = 51 [hz._hzInstance_1_jet.jet.blocking.thread-5] genrated = 54 [hz._hzInstance_1_jet.jet.blocking.thread-4] genrated = 80 [hz._hzInstance_1_jet.jet.blocking.thread-7] genrated = 62 [hz._hzInstance_1_jet.jet.blocking.thread-0] genrated = 39 [hz._hzInstance_1_jet.jet.blocking.thread-1] genrated = 53 [hz._hzInstance_1_jet.jet.blocking.thread-6] genrated = 25 [hz._hzInstance_1_jet.jet.blocking.thread-6] genrated = 25 [hz._hzInstance_1_jet.jet.blocking.thread-5] genrated = 54 [hz._hzInstance_1_jet.jet.blocking.thread-2] genrated = 56 ## フォーマット [hz._hzInstance_1_jet.jet.cooperative.thread-0] record = 23:12:30.000 37 148 [hz._hzInstance_1_jet.jet.cooperative.thread-0] record = 23:12:30.000 25 200 [hz._hzInstance_1_jet.jet.cooperative.thread-3] record = 23:12:30.000 29 116 [hz._hzInstance_1_jet.jet.cooperative.thread-2] record = 23:12:30.000 1 4 [hz._hzInstance_1_jet.jet.cooperative.thread-6] record = 23:12:30.000 19 76 [hz._hzInstance_1_jet.jet.cooperative.thread-6] record = 23:12:30.000 62 496
ちなみに、同一Nodeから並行してデータ出力が行われているように見えるかもしれませんが、これはVertex#localParallelismを調整
しなかったからですね…。Getting Startedでは、1にしているようなのでお好みで。
で、結果確認。
例えば、出力されたデータが「87」の場合の結果。
$ grep -r ' 87 ' runner/target/ server*/target server2/target/jet/8:23:12:30.000 87 348 server2/target/jet/8:23:12:40.000 87 1218 server2/target/jet/8:23:12:50.000 87 2088 server2/target/jet/8:23:13:00.000 87 2610 server2/target/jet/8:23:13:10.000 87 2610 server2/target/jet/8:23:13:20.000 87 2610 server2/target/jet/8:23:13:30.000 87 2610 server2/target/jet/8:23:13:40.000 87 2610 server2/target/jet/8:23:13:50.000 87 2610 server2/target/jet/8:23:14:00.000 87 2610 server2/target/jet/8:23:14:10.000 87 2610 server2/target/jet/8:23:14:20.000 87 2610 server2/target/jet/8:23:14:30.000 87 2262 server2/target/jet/8:23:14:40.000 87 1392 server2/target/jet/8:23:14:50.000 87 522
87×10は870、Windows Sizeは30なので、2,670のところはデータが期間分フルで揃っているところですね。最初と最後で徐々に数字が少なくなっているのは
開始と終了のタイミングがあるのでデータが減るからですね。
他のデータについて確認しても、似たような結果になります。
また、今回はSliding WindowでやってみましたがTumbling Windowも試してみましょう。
WindowDefinition windowDef =
// WindowDefinition.slidingWindowDef(30_000L, 10_000L);
WindowDefinition.tumblingWindowDef(30_000L);
結果。
$ grep -r ' 20 ' runner/target/ server*/target runner/target/jet/1:23:21:30.000 20 40 runner/target/jet/1:23:22:00.000 20 600 runner/target/jet/1:23:22:30.000 20 600 runner/target/jet/1:23:23:00.000 20 600 runner/target/jet/1:23:23:30.000 20 560
20×30で、600ですね。
とりあえず、OKそうな。
まとめ
Hazelcast Jetを使って、StreamとSliding Windowの初歩を試してみました。
Stream処理に全然慣れていないので、用語や概念を覚えるところからだったりしますが、ゆっくりやっていこうと思います。
今回作成したソースコードは、こちらに置いています。
https://github.com/kazuhira-r/hazelcast-jet-examples/tree/master/embedded-streaming-getting-started