CLOVER🍀

That was when it all began.

はじめてのHazelcast Jet(Client/Server Distributed Stream API)

この前、はじめてHazelcast Jetを使って分散Stream APIでWord Countをしてみたのですが、

はじめてのHazelcast Jet(Embedded Distributed Stream API) - CLOVER

この時はEmbedded Modeで書いていました。今度は、同じネタをClient/Server Modeで行ってみたいと思います。

内容は前回とほとんど同じなので、似たような説明は割愛します。

準備

今回使ったMaven依存関係は、こちら。

        <dependency>
            <groupId>com.hazelcast.jet</groupId>
            <artifactId>hazelcast-jet-core</artifactId>
            <version>0.4</version>
        </dependency>

Embeddedの時にも書きましたが、アーティファクト「hazelcast-jet」を選択してもOKです。この場合は、
Maven Shade PluginでひとかたまりのJARとして依存関係に追加されるという話でした。

また、Hazelcast JetではEmbedded ModeであってもClient/Server Modeであっても、通常指定するモジュールでは
Client/Serverの区別はありません。同じように依存関係に追加すればOKです。

まあ、利用するAPIは一部違いますが。

お題

今回のお題は、このように。

  • Jet 0.4 is Released | Hazelcast Blogの内容を元に、WordCount
  • WordCountには、Stream APIを使用
  • 結果は、ソートして上位20件を取得
  • Hazelcast Jet Nodeは、Client Nodeひとつ、Server Node 3つで構成

前回は全部Embeddedな感じでしたが、今回はジョブを起動するのがClientになり、それ以外はServer、となります。
まあ、Server=Embeddedなとらえ方でいいのですが。

WordCount対象のテキストは、前回と同じようにこんな感じで。

$ head -n 20 src/main/resources/jet-0_4-release.txt
We are happy to announce the release of Hazelcast Jet 0.4. This is the first major release since our inital version and has many improvements bringing us closer to our final vision for Jet:

Improved streaming support including windowing support with event-time semantics.
Out of the box support for tumbling, sliding and session window aggregations.
New AggregateOperation abstraction with several built-in ones including count, average, sum, min, max and linear regression.
Hazelcast version updated to 3.8.2 and Hazelcast IMDG is now shaded inside hazelcast-jet.jar.
Several built-in diagnostic processors and unit test support for writing custom processors.
Many new code samples including several streaming examples and enrichment and co-group for batch operations.
New sources and sinks including ICache, socket and file.
Windowing Support with Event-Time semantics

The biggest improvement is the addition of tools for using Jet on infinite streams of data. Dealing with streaming data is fundamentally different than batch or micro-batch processing as both input and output is continuous. Most streaming computations also deal with some notion of time where you are interested in how a value changes over time. The typical way to deal with streaming data is to look at it in terms of “windows”, where a window represents a slice of the data stream, usually constrained for a period of time.

Jet 0.4 adds several processors which deal specifically with aggregation of streaming data into windows. Types of windowing supported can be summarised as follows:

Tumbling Windows: Fixed-size, non-overlapping and contiguous intervals. For example a tumbling window of 1 minute would mean that each window is of 1 minute length, are not overlapping, and contiguous to each other.
Sliding Windows: A variation on tumbling windows, sliding windows can have overlapping elements and slide in fixed increments as time advances.
Session Windows: Typically captures a burst of events which are separated by a period of inactivity. This can be used to window events by a user session, for example.
Jet also supports the notion of “event-time” where events can have their own timestamp and can arrive out of order. This is achieved by inserting watermarks into the stream of events which drive the passage of time forward.

サンプルプログラム

サンプルプログラムとしては、次の2つを用意します。

  • とりあえず浮いていてもらう、EmbeddedなHazelcast Jet Server
  • Hazelcast Jetクラスタにデータを入れ、WordCountを行うClient

Serverの方、なにか作るの?という感じかもしれませんが、まあ要ります。

先に簡単なEmbeddedなServerの方から。こちらは、前回とまったく同じです。
src/main/java/org/littlewings/hazelcast/jet/EmbeddedJetServer.java

package org.littlewings.hazelcast.jet;

import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;

public class EmbeddedJetServer {
    public static void main(String... args) {
        JetInstance jet = Jet.newJetInstance();

        System.console().readLine("> stop enter.");

        jet.shutdown();
        Jet.shutdownAll();
    }
}

Enterを打ったら、おしまい。

Word Countの方も、基本的には前回と同じです。
src/main/java/org/littlewings/hazelcast/jet/WordCountRunner.java

package org.littlewings.hazelcast.jet;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.stream.DistributedCollectors;
import com.hazelcast.jet.stream.IStreamMap;

public class WordCountRunner {
    public static void main(String... args) {
        JetInstance jet = Jet.newJetClient();

        IStreamMap<Integer, String> map = jet.getMap("source");

        try (InputStream is = WordCountRunner.class.getClassLoader().getResourceAsStream("jet-0_4-release.txt");
             InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8);
             BufferedReader reader = new BufferedReader(isr);
             Stream<String> lines = reader.lines()) {
            AtomicInteger count = new AtomicInteger();
            lines.forEach(line -> map.put(count.incrementAndGet(), line));
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }

        IStreamMap<String, Integer> wordCount =
                map
                        .stream()
                        .map(e -> e.getValue().replaceAll("[=()\"',.:;”-]", " ").trim())
                        .filter(line -> !line.isEmpty())
                        .flatMap(line -> Arrays.stream(line.split(" +")))
                        .map(word -> {
                            String lc = word.toLowerCase();
                            System.out.printf("[%s] %s%n", Thread.currentThread().getName(), lc);
                            return lc;
                        })
                        .collect(DistributedCollectors.toIMap(
                                "word-count-map",
                                word -> word,
                                word -> 1,
                                Integer::sum)
                        );

        List<Map.Entry<String, Integer>> top20 =
                wordCount
                        .stream()
                        .sorted(Map.Entry.<String, Integer>comparingByValue().reversed())
                        .limit(20)
                        .collect(DistributedCollectors.toList());

        System.out.println("result:");
        top20.forEach(e -> System.out.println("  " + e));

        System.out.println();
        System.out.println("source map size = " + jet.getMap("source").size());
        System.out.println("word-count-map size = " + jet.getMap("word-count-map").size());

        jet.shutdown();
        Jet.shutdownAll();
    }
}

違うのは、ここだけです。

        JetInstance jet = Jet.newJetClient();

Hazelcast Jetを、HazelcastのClientとして動作させます。

あとはStream APIで、FunctionやCollectorをSerializableなものとして放り込みます。このあたりの型は、
Hazelcast Jetが面倒をみています。

動作確認

では、動作確認してみます。

Hazelcast Jet Serverを3 Node起動します。

## Node 1
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.jet.EmbeddedJetServer

## Node 2
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.jet.EmbeddedJetServer

## Node 3
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.jet.EmbeddedJetServer

クラスタが構成されます。

Members [3] {
	Member [172.20.0.1]:5701 - 1ff27fc1-798c-4449-8a3b-5721be2838b7 this
	Member [172.20.0.1]:5702 - 3c8e2714-d0b0-47d4-ace2-340e406a9ef3
	Member [172.20.0.1]:5703 - c368c771-a84f-4216-ae14-311f3f35b0cd
}

Word Countを実行。

$ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.jet.WordCountRunner

どういうプランで実行されるかも表示されますが、これが出力されるのはServer側です。

7 09, 2017 12:40:20 午前 com.hazelcast.jet.impl.operation.ExecuteJobOperation
情報: [172.20.0.1]:5701 [jet] [0.4] [3.8.2] Start executing job 1: dag
    .vertex("read-map-word-count-map")
    .vertex("sort-34e8961fcc47").localParallelism(1)
    .vertex("limit-local-1a0370323cca").localParallelism(1)
    .vertex("accumulator").localParallelism(1)
    .vertex("combiner").localParallelism(1)
    .vertex("write-__jet_list_6a26dceb-1c18-4346-90d0-a174aa291b0f")
    .edge(between("read-map-word-count-map", "sort-34e8961fcc47").partitioned(?).distributed())
    .edge(between("sort-34e8961fcc47", "limit-local-1a0370323cca"))
    .edge(between("limit-local-1a0370323cca", "accumulator"))
    .edge(between("accumulator", "combiner").partitioned(?).distributed())
    .edge(between("combiner", "write-__jet_list_6a26dceb-1c18-4346-90d0-a174aa291b0f"))

また、各Server Nodeで処理が実行されていることもわかります。

## Node 1
情報: [172.20.0.1]:5701 [jet] [0.4] [3.8.2] Start execution of plan for job 0 from caller [172.20.0.1]:5703.
[hz._hzInstance_1_jet.jet.cooperative.thread-1] averaginglong
[hz._hzInstance_1_jet.jet.cooperative.thread-4] as
[hz._hzInstance_1_jet.jet.cooperative.thread-4] with
[hz._hzInstance_1_jet.jet.cooperative.thread-0] new
[hz._hzInstance_1_jet.jet.cooperative.thread-0] aggregateoperation
[hz._hzInstance_1_jet.jet.cooperative.thread-0] abstraction
[hz._hzInstance_1_jet.jet.cooperative.thread-0] with
[hz._hzInstance_1_jet.jet.cooperative.thread-0] several
[hz._hzInstance_1_jet.jet.cooperative.thread-0] built
[hz._hzInstance_1_jet.jet.cooperative.thread-0] in
[hz._hzInstance_1_jet.jet.cooperative.thread-0] ones
〜省略〜


## Node 2
[hz._hzInstance_1_jet.jet.cooperative.thread-2] vertex
[hz._hzInstance_1_jet.jet.cooperative.thread-2] source
[hz._hzInstance_1_jet.jet.cooperative.thread-2] dag
[hz._hzInstance_1_jet.jet.cooperative.thread-2] newvertex
[hz._hzInstance_1_jet.jet.cooperative.thread-2] source
[hz._hzInstance_1_jet.jet.cooperative.thread-2] sources
[hz._hzInstance_1_jet.jet.cooperative.thread-2] streamsocket
[hz._hzInstance_1_jet.jet.cooperative.thread-2] host
[hz._hzInstance_1_jet.jet.cooperative.thread-2] port
[hz._hzInstance_1_jet.jet.cooperative.thread-2] several
[hz._hzInstance_1_jet.jet.cooperative.thread-2] examples
[hz._hzInstance_1_jet.jet.cooperative.thread-2] with
[hz._hzInstance_1_jet.jet.cooperative.thread-2] windowing
[hz._hzInstance_1_jet.jet.cooperative.thread-2] have
〜省略〜


## Node 3
[hz._hzInstance_1_jet.jet.cooperative.thread-2] new
[hz._hzInstance_1_jet.jet.cooperative.thread-2] classes
[hz._hzInstance_1_jet.jet.cooperative.thread-2] have
[hz._hzInstance_1_jet.jet.cooperative.thread-2] been
[hz._hzInstance_1_jet.jet.cooperative.thread-2] added
[hz._hzInstance_1_jet.jet.cooperative.thread-2] to
[hz._hzInstance_1_jet.jet.cooperative.thread-2] make
[hz._hzInstance_1_jet.jet.cooperative.thread-2] it
[hz._hzInstance_1_jet.jet.cooperative.thread-2] easier
[hz._hzInstance_1_jet.jet.cooperative.thread-2] to
[hz._hzInstance_1_jet.jet.cooperative.thread-2] write
[hz._hzInstance_1_jet.jet.cooperative.thread-2] unit
〜省略〜

最後に、結果表示。これはClient側で出力されます。

情報: hz.client_0 [jet] [0.4] [3.8.2] Authenticated with server [172.20.0.1]:5702, server version:3.8.2 Local address: /172.20.0.1:44551
result:
  and=28
  the=25
  of=24
  jet=19
  for=18
  a=17
  is=16
  can=15
  to=15
  hazelcast=13
  as=11
  be=11
  in=10
  with=10
  new=9
  streaming=9
  processors=9
  time=8
  also=8
  windows=7

source map size = 87
word-count-map size = 330

結果は、Embedded Modeの時と同じです(そりゃそうだ)。

Client/Server Modeになった時のポイントとまとめ

Stream APIで組み上げられたジョブをHazelcast Jet Serverのクラスタに放り込むのはClientですが、Clientで
実装したクラスがシリアライズされてServer Node上で実行されます。

このため、Server Node上にClient側で実装したクラスが含まれている必要があります。

これを無視して、素のHazelcast Jet Serverを起動すると、Server側でClassNotFoundExceptionと言われてしまいます。

情報: hz.client_0 [jet] [0.4] [3.8.2] HazelcastClient 3.8.2 (20170518 - a60f944) is CLIENT_CONNECTED
[WARNING] 
java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:294)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.hazelcast.nio.serialization.HazelcastSerializationException: java.lang.ClassNotFoundException: org.littlewings.hazelcast.jet.WordCountRunner
	at com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject$Serializer.read(CustomClassLoadedObject.java:108)
	at com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject$Serializer.read(CustomClassLoadedObject.java:87)
	at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:48)
	at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:266)
	at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:599)
	at com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject.read(CustomClassLoadedObject.java:54)
	at com.hazelcast.jet.Vertex.readData(Vertex.java:167)
〜省略〜

なので、今回はEmbeddedなサーバーを作り、同じMavenプロジェクトから起動するようにしました、と。

今回の範囲だと、注意点はこれくらいですね。

あとはすんなり動きました。

今回作成したソースコードは、こちらに置いています。
https://github.com/kazuhira-r/hazelcast-jet-examples/tree/master/client-server-getting-started