CLOVER🍀

That was when it all began.

Spring Cloud StreamのReactive Programming Supportで遊ぶ

Spring Cloud Streamには、Reactor…Reactive Programmingに対するサポートがあります。

Programming Model / Reactive Programming Support

今まで、Reactiveでない方のAPI(詳細見てませんけど、対比してなんて言うんだ?)を使ってばかりでしたが、今回はReactorと合わせて
Spring Cloud Streamを使ってみたいと思います。

この機能を使うと、ReactorのFluxなどのAPIを使って、Spring Cloud Streamの機能を使ったプログラムを書くことができます。

お題

今回のお題は、こういうものでいきましょう。

  • Source、Processor、Sinkの構成とする
  • 各要素をつなぐメッセージングミドルウェアは、Apache Kafkaとする
  • [Source] TwitterのストリームAPIを使い、「#nowplaying」タグを含むツイートをSourceとしてApache Kafkaに放り込む
  • [Processor] CodelibsのApache Lucene Kuromoji Analyzerを使って、ツイートを単語に分解する
    • この時、単語にツイートの日時も付加するようにする
  • [Sink] Processorにより分解された単語を、Window処理でWord Countする
    • Window処理はウィンドウサイズを1分、スライディングする時間を10秒として設定
    • あまり大量に出力されるのも見づらいので、あるウィンドウ内で出現数が5以上の単語のみを出力
    • 最後に、該当のウィンドウに含まれた単語がツイートされた時間をまとめて出力し、ウィンドウやスライディングの処理が想定通りになっているか確認する

要するに、ツイートを1分間ごとにWord Countしようというお題ですね。

これを、ReactorのAPIを使って実現します。

環境

Javaに関する情報は、こちら。

$ java -version
openjdk version "1.8.0_171"
OpenJDK Runtime Environment (build 1.8.0_171-8u171-b11-0ubuntu0.18.04.1-b11)
OpenJDK 64-Bit Server VM (build 25.171-b11, mixed mode)


$ mvn -version
Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 2018-02-25T04:49:05+09:00)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 1.8.0_171, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-23-generic", arch: "amd64", family: "unix"

Apache Kafkaは1.1.0を使用し、1 Nodeとします。また、事前に起動済みとします。

Spring Cloud Streamのバージョンは、Elmhurst.RELEASEです。

Maven依存関係

ここでは、親pom.xmlの内容をまずは紹介。

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-dependencies</artifactId>
                <version>Elmhurst.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.0.1.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.0.1.RELEASE</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

このサブモジュールとして、Source、Processor、Sinkを作っていきます。

Source

では、まずはSourceを作成します。Maven依存関係は、こちら。

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-reactive</artifactId>
        </dependency>
        <dependency>
            <groupId>org.twitter4j</groupId>
            <artifactId>twitter4j-stream</artifactId>
            <version>4.0.6</version>
        </dependency>

ミニマムでいっているので、Spring WebFluxすら使用していません。

TwitterAPIの利用には、Twitter4Jを使用します。事前に、アカウントと「twitter4j.properties」を用意しておいてください。

データはSpring Cloud Stream経由でApache Kafkaに登録するため「spring-cloud-starter-stream-kafka」が必要ですが、Reactorを使うには
さらに「spring-cloud-stream-reactive」が必要です。

Sourceのソースコードは、こちら。
twitter-source/src/main/java/org/littlewings/spring/cloud/Twiturce.java

package org.littlewings.spring.cloud;

import java.util.LinkedHashMap;
import java.util.Map;
import javax.annotation.PreDestroy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.reactive.StreamEmitter;
import reactor.core.publisher.Flux;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;

@SpringBootApplication
@EnableBinding(Source.class)
public class TwitteStreamSource {
    Logger logger = LoggerFactory.getLogger(TwitteStreamSource.class);

    TwitterStream twitterStream = new TwitterStreamFactory().getInstance();

    public static void main(String... args) {
        SpringApplication.run(TwitteStreamSource.class, args);
    }

    @StreamEmitter
    @Output(Source.OUTPUT)
    public Flux<Map<String, Object>> streaming() {
        return Flux.create(sink -> {
            twitterStream.addListener(new StatusListener() {
                @Override
                public void onStatus(Status status) {
                    Map<String, Object> tweet = new LinkedHashMap<>();
                    tweet.put("id", status.getId());
                    tweet.put("screenName", status.getUser().getScreenName());
                    tweet.put("name", status.getUser().getName());
                    tweet.put("text", status.getText());
                    tweet.put("createdAt", status.getCreatedAt());

                    logger.info("tweet = {}", tweet);
                    sink.next(tweet);
                }

                @Override
                public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
                }

                @Override
                public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
                }

                @Override
                public void onScrubGeo(long userId, long upToStatusId) {
                }

                @Override
                public void onStallWarning(StallWarning warning) {
                }

                @Override
                public void onException(Exception e) {
                    e.printStackTrace();
                }
            });

            twitterStream.filter("#nowplaying");
        });
    }

    @PreDestroy
    public void stop() {
        twitterStream.clearListeners();
        twitterStream.cleanUp();
    }
}

ここでのポイントは、@StreamEmitterアノテーションを付与したメソッドとし、@Output(Source.OUTPUT)を指定していることです。この宣言の状態で、
Fluxなどを返すとOKです。

    @StreamEmitter
    @Output(Source.OUTPUT)
    public Flux<Map<String, Object>> streaming() {
        return Flux.create(sink -> {

なお、@OutputについてはFluxSenderを使用してデータを送っても大丈夫です。その場合は@StreamListenerとなっており、サンプルがドキュメントに
記載されています。
Programming Model / Reactive Programming Support

Fluxは、Flux#createを使って、Twitter4JのStatusListenerが受け取った値をMapに詰め替えてSinkに渡す実装としています。

        return Flux.create(sink -> {
            twitterStream.addListener(new StatusListener() {
                @Override
                public void onStatus(Status status) {
                    Map<String, Object> tweet = new LinkedHashMap<>();
                    tweet.put("id", status.getId());
                    tweet.put("screenName", status.getUser().getScreenName());
                    tweet.put("name", status.getUser().getName());
                    tweet.put("text", status.getText());
                    tweet.put("createdAt", status.getCreatedAt());

                    logger.info("tweet = {}", tweet);
                    sink.next(tweet);
                }

一応トレースもしたいので、ツイート内容はログ出力するようにしています。

アプリケーションの設定は、こちら。
twitter-source/src/main/resources/application.properties

spring.cloud.stream.bindings.output.destination=tweet-source-topic
spring.cloud.stream.bindings.output.contentType=application/json

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092

Apache Kafkaが動作しているサーバーは、「172.17.0.2」とします。

Twitter4Jの利用にあたっては、「twitter4j.properties」に設定を行っています。
twitter-source/src/main/resources/twitter4j.properties

Processor

続いて、ProcessorMaven依存関係は、こちらです。

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-reactive</artifactId>
        </dependency>
        <dependency>
            <groupId>org.codelibs</groupId>
            <artifactId>lucene-analyzers-kuromoji-ipadic-neologd</artifactId>
            <version>7.3.1-20180604</version>
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>codelibs.org</id>
            <name>CodeLibs Repository</name>
            <url>http://maven.codelibs.org/</url>
        </repository>
    </repositories>

Apache Kuromojiは、Codelibsによりmecab-ipadic-NEologdを組み込んでビルドされたものを使用します。

Processorソースコードは、こちら。
tokenize-processor/src/main/java/org/littlewings/spring/cloud/TweetTokenizeProcessor.java

package org.littlewings.spring.cloud;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.codelibs.neologd.ipadic.lucene.analysis.ja.JapaneseAnalyzer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import reactor.core.publisher.Flux;

@SpringBootApplication
@EnableBinding(Processor.class)
public class TweetTokenizeProcessor {
    Logger logger = LoggerFactory.getLogger(TweetTokenizeProcessor.class);

    JapaneseAnalyzer analyzer = new JapaneseAnalyzer();

    public static void main(String... args) {
        SpringApplication.run(TweetTokenizeProcessor.class, args);
    }

    @StreamListener
    @Output(Processor.OUTPUT)
    public Flux<String[]> receive(@Input(Processor.INPUT) Flux<Map<String, Object>> tweets) {
        return tweets.flatMap(tweet -> {
            String tweetText = (String) tweet.get("text");

            TokenStream tokenStream = analyzer.tokenStream("", tweetText);
            CharTermAttribute charTermAttribute = tokenStream.addAttribute(CharTermAttribute.class);

            try {
                List<String[]> tokens = new ArrayList<>();

                tokenStream.reset();

                while (tokenStream.incrementToken()) {
                    String token = charTermAttribute.toString();

                    if (token.length() >= 5) {
                        String dateAsString = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(tweet.get("createdAt"));
                        logger.info("token = {}, time = {}", token, dateAsString);

                        String[] tokenAndDate = {token, dateAsString};
                        tokens.add(tokenAndDate);
                    }
                }

                tokenStream.end();

                return Flux.fromIterable(tokens);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            } finally {
                try {
                    tokenStream.close();
                } catch (IOException e) {
                    // ignore
                }
            }
        });
    }
}

今回は@StreamListenerとして定義し、入力を@InputかつFluxとして、結果を@OutputでFluxとして返すように作成しています。

    @StreamListener
    @Output(Processor.OUTPUT)
    public Flux<String[]> receive(@Input(Processor.INPUT) Flux<Map<String, Object>> tweets) {

形態素解析後は、5文字以上の単語を、ツイートの日時と一緒にApache Kafkaに送ります。

                while (tokenStream.incrementToken()) {
                    String token = charTermAttribute.toString();

                    if (token.length() >= 5) {
                        String dateAsString = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(tweet.get("createdAt"));
                        logger.info("token = {}, time = {}", token, dateAsString);

                        String[] tokenAndDate = {token, dateAsString};
                        tokens.add(tokenAndDate);
                    }
                }

アプリケーションの設定は、こちら。
tokenize-processor/src/main/resources/application.properties

spring.cloud.stream.bindings.input.destination=tweet-source-topic
spring.cloud.stream.bindings.input.group=tweet-source-group

spring.cloud.stream.bindings.output.destination=tweet-word-topic
spring.cloud.stream.bindings.output.contentType=application/json

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092

Sink

最後は、Sinkです。

Maven依存関係は、こちら。

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-reactive</artifactId>
        </dependency>

ソースコードは、こちら。
token-sink/src/main/java/org/littlewings/spring/cloud/WordCountSink.java

package org.littlewings.spring.cloud;

import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@SpringBootApplication
@EnableBinding(Sink.class)
public class WordCountSink {
    Logger logger = LoggerFactory.getLogger(WordCountSink.class);

    public static void main(String... args) {
        SpringApplication.run(WordCountSink.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void wordCount(Flux<String[]> words) {
        words
                .window(Duration.ofMinutes(1L), Duration.ofSeconds(10L))
                .subscribe(ws -> {
                    Mono<TimeRecords> records = ws.reduce(new TimeRecords(), (acc, word) -> {
                        acc.addDate(word[1]);
                        acc.increment(word[0]);
                        return acc;
                    });

                    records.subscribe(m -> {
                        m.getWordCounts().entrySet().forEach(entry -> {
                            if (entry.getValue() >= 5) {
                                logger.info(
                                        "word = {}, count = {}",
                                        entry.getKey(),
                                        entry.getValue()
                                );
                            }
                        });

                        logger.info("contain times = {}", m.times);
                    });
                });
    }

    static class TimeRecords {
        Set<String> times = new TreeSet<>();

        Map<String, Integer> wordCounts = new TreeMap<>();

        public void addDate(String date) {
            times.add(date);
        }

        public void increment(String word) {
            wordCounts.merge(word, 1, Integer::sum);
        }

        public Map<String, Integer> getWordCounts() {
            return wordCounts;
        }
    }
}

@StreamListenerを付けて、Processorで単語分割した結果と、ツイートの日時(文字列ですが)をFluxとして受け取ります。

    @StreamListener(Sink.INPUT)
    public void wordCount(Flux<String[]> words) {

そして、これをWord Count。

ウィンドウサイズを1分にして、10秒ごとにスライドさせるようにしました。

        words
                .window(Duration.ofMinutes(1L), Duration.ofSeconds(10L))

Word Countは、ちょっと箱物クラスを作って、その中で行います。

    static class TimeRecords {
        Set<String> times = new TreeSet<>();

        Map<String, Integer> wordCounts = new TreeMap<>();

        public void addDate(String date) {
            times.add(date);
        }

        public void increment(String word) {
            wordCounts.merge(word, 1, Integer::sum);
        }

        public Map<String, Integer> getWordCounts() {
            return wordCounts;
        }
    }

単語と出現回数自体はMapで持つのですが、その時に単語分割した時に得られた日時も持っておきます。

これで、ウィンドウ内に含まれる時間を見ようかなと。

集計部分。

                .subscribe(ws -> {
                    Mono<TimeRecords> records = ws.reduce(new TimeRecords(), (acc, word) -> {
                        acc.addDate(word[1]);
                        acc.increment(word[0]);
                        return acc;
                    });

                    records.subscribe(m -> {
                        m.getWordCounts().entrySet().forEach(entry -> {
                            if (entry.getValue() >= 5) {
                                logger.info(
                                        "word = {}, count = {}",
                                        entry.getKey(),
                                        entry.getValue()
                                );
                            }
                        });

                        logger.info("contain times = {}", m.times);
                    });
                });

結果は標準出力に出すのですが、あんまり出力されすぎるのもなんなので、登場回数が5回以上のものにしました。

ただ、処理対象となった日時は全部含めるものとしています。

アプリケーションの設定は、こちら。
token-sink/src/main/resources/application.properties

spring.cloud.stream.bindings.input.destination=tweet-word-topic
spring.cloud.stream.bindings.input.group=tweet-word-group

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092

実行

では、作成したアプリケーションを実行してみます。

パッケージング。

$ mvn package

実行。

## Source
$ java -jar twitter-source/target/twitter-source-0.0.1-SNAPSHOT.jar


## Processor
$ java -jar tokenize-processor/target/tokenize-processor-0.0.1-SNAPSHOT.jar


## Sink
$ java -jar token-sink/target/token-sink-0.0.1-SNAPSHOT.jar

実行ログ。

Source…のログは、Twitterのidとかいろいろ出てくるので、省略。

Processorhttpsとか入っているのは、ご愛嬌…。

2018-06-19 21:02:57.645  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = リニューアル, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.646  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = nowplaying, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.646  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = https, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.807  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = 08234, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.808  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = nowplaying, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.858  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = rokomama, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.859  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = ジェジュン, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.859  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = ジェジュン, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.859  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = nowplaying, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.859  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = https, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.860  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = bdbyh, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.869  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = hjinny, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.870  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = ジェジュン, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.870  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = nowplaying, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.870  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = https, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.870  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = sarwrsgk, time = 2018-06-19 21:02:57
2018-06-19 21:02:58.158  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = smallzy, time = 2018-06-19 21:02:57

Sink側は、最初は1分、それ以降は10秒ごとに、スライドしながら結果が出力されます。

2018-06-19 21:03:34.308  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = =love, count = 5
2018-06-19 21:03:34.308  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = bornfreeonekiss, count = 14
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = favourite, count = 5
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = https, count = 232
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = japan, count = 7
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = jejung, count = 30
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = let's, count = 5
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = listen, count = 11
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = listenlive, count = 5
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = love music, count = 5
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = mrskodako, count = 6
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = music, count = 14
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = nowplaying, count = 273
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = radio, count = 9
2018-06-19 21:03:34.310  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = single, count = 5
2018-06-19 21:03:34.310  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = smallzy, count = 5
2018-06-19 21:03:34.310  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = smallzyssurgery, count = 5
2018-06-19 21:03:34.310  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = sukatto, count = 7
2018-06-19 21:03:34.310  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = the the, count = 11
2018-06-19 21:03:34.310  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = youngblood, count = 5
2018-06-19 21:03:34.310  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = ありがとう, count = 10
2018-06-19 21:03:34.310  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = ジェジュン, count = 166
2018-06-19 21:03:34.310  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = リクエスト, count = 15
2018-06-19 21:03:34.310  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = 痛快tvスカッとジャパン, count = 5
2018-06-19 21:03:34.310  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : contain times = [2018-06-19 21:02:34, 2018-06-19 21:02:35, 2018-06-19 21:02:36, 2018-06-19 21:02:37, 2018-06-19 21:02:38, 2018-06-19 21:02:39, 2018-06-19 21:02:40, 2018-06-19 21:02:41, 2018-06-19 21:02:42, 2018-06-19 21:02:43, 2018-06-19 21:02:44, 2018-06-19 21:02:45, 2018-06-19 21:02:46, 2018-06-19 21:02:47, 2018-06-19 21:02:48, 2018-06-19 21:02:49, 2018-06-19 21:02:50, 2018-06-19 21:02:51, 2018-06-19 21:02:52, 2018-06-19 21:02:53, 2018-06-19 21:02:54, 2018-06-19 21:02:55, 2018-06-19 21:02:56, 2018-06-19 21:02:57, 2018-06-19 21:02:58, 2018-06-19 21:02:59, 2018-06-19 21:03:00, 2018-06-19 21:03:01, 2018-06-19 21:03:02, 2018-06-19 21:03:03, 2018-06-19 21:03:04, 2018-06-19 21:03:05, 2018-06-19 21:03:06, 2018-06-19 21:03:07, 2018-06-19 21:03:08, 2018-06-19 21:03:09, 2018-06-19 21:03:10, 2018-06-19 21:03:11, 2018-06-19 21:03:12, 2018-06-19 21:03:13, 2018-06-19 21:03:14, 2018-06-19 21:03:15, 2018-06-19 21:03:16, 2018-06-19 21:03:17, 2018-06-19 21:03:18, 2018-06-19 21:03:19, 2018-06-19 21:03:20, 2018-06-19 21:03:21, 2018-06-19 21:03:22, 2018-06-19 21:03:23, 2018-06-19 21:03:24, 2018-06-19 21:03:25, 2018-06-19 21:03:26, 2018-06-19 21:03:27, 2018-06-19 21:03:28, 2018-06-19 21:03:29, 2018-06-19 21:03:30, 2018-06-19 21:03:31, 2018-06-19 21:03:32, 2018-06-19 21:03:33, 2018-06-19 21:03:34]


2018-06-19 21:03:44.309  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = =love, count = 5
2018-06-19 21:03:44.309  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = bornfreeonekiss, count = 15
2018-06-19 21:03:44.310  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = favourite, count = 8
2018-06-19 21:03:44.310  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = https, count = 231
2018-06-19 21:03:44.310  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = instagram, count = 5
2018-06-19 21:03:44.310  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = japan, count = 7
2018-06-19 21:03:44.311  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = jejung, count = 30
2018-06-19 21:03:44.311  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = let's, count = 8
2018-06-19 21:03:44.311  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = listen, count = 12
2018-06-19 21:03:44.312  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = listenlive, count = 6
2018-06-19 21:03:44.312  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = love music, count = 5
2018-06-19 21:03:44.313  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = mrskodako, count = 5
2018-06-19 21:03:44.313  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = music, count = 13
2018-06-19 21:03:44.314  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = nowplaying, count = 278
2018-06-19 21:03:44.314  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = radio, count = 13
2018-06-19 21:03:44.315  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = retweet, count = 7
2018-06-19 21:03:44.315  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = single, count = 5
2018-06-19 21:03:44.315  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = smallzy, count = 8
2018-06-19 21:03:44.316  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = smallzyssurgery, count = 8
2018-06-19 21:03:44.316  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = sukatto, count = 7
2018-06-19 21:03:44.317  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = the now, count = 6
2018-06-19 21:03:44.317  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = the the, count = 7
2018-06-19 21:03:44.318  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = tweet, count = 7
2018-06-19 21:03:44.318  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = youngblood, count = 8
2018-06-19 21:03:44.319  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = ありがとう, count = 9
2018-06-19 21:03:44.319  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = ジェジュン, count = 162
2018-06-19 21:03:44.319  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = リクエスト, count = 14
2018-06-19 21:03:44.320  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : contain times = [2018-06-19 21:02:44, 2018-06-19 21:02:45, 2018-06-19 21:02:46, 2018-06-19 21:02:47, 2018-06-19 21:02:48, 2018-06-19 21:02:49, 2018-06-19 21:02:50, 2018-06-19 21:02:51, 2018-06-19 21:02:52, 2018-06-19 21:02:53, 2018-06-19 21:02:54, 2018-06-19 21:02:55, 2018-06-19 21:02:56, 2018-06-19 21:02:57, 2018-06-19 21:02:58, 2018-06-19 21:02:59, 2018-06-19 21:03:00, 2018-06-19 21:03:01, 2018-06-19 21:03:02, 2018-06-19 21:03:03, 2018-06-19 21:03:04, 2018-06-19 21:03:05, 2018-06-19 21:03:06, 2018-06-19 21:03:07, 2018-06-19 21:03:08, 2018-06-19 21:03:09, 2018-06-19 21:03:10, 2018-06-19 21:03:11, 2018-06-19 21:03:12, 2018-06-19 21:03:13, 2018-06-19 21:03:14, 2018-06-19 21:03:15, 2018-06-19 21:03:16, 2018-06-19 21:03:17, 2018-06-19 21:03:18, 2018-06-19 21:03:19, 2018-06-19 21:03:20, 2018-06-19 21:03:21, 2018-06-19 21:03:22, 2018-06-19 21:03:23, 2018-06-19 21:03:24, 2018-06-19 21:03:25, 2018-06-19 21:03:26, 2018-06-19 21:03:27, 2018-06-19 21:03:28, 2018-06-19 21:03:29, 2018-06-19 21:03:30, 2018-06-19 21:03:31, 2018-06-19 21:03:32, 2018-06-19 21:03:33, 2018-06-19 21:03:34, 2018-06-19 21:03:35, 2018-06-19 21:03:36, 2018-06-19 21:03:37, 2018-06-19 21:03:38, 2018-06-19 21:03:39, 2018-06-19 21:03:40, 2018-06-19 21:03:41, 2018-06-19 21:03:42, 2018-06-19 21:03:43]

ログを見ると、どの範囲の時間が含まれているか、わかる感じですね。

まとめ

今回は、Spring Cloud StreamのReactive Programming Supportを試すということで、Twitterからストリームを読み込み、Apache Kafkaに放り込み、単語分割し、
最後にWord CountするところまでをReactorを使ってやってみました。

最近Reactorを使っていなかったのでちょっとてこずりましたが、使っていないAPIなども試しつつ、理解が進んだ感じでよかったかなと。