Spring Cloud Streamには、Reactor…Reactive Programmingに対するサポートがあります。
Programming Model / Reactive Programming Support
今まで、Reactiveでない方のAPI(詳細見てませんけど、対比してなんて言うんだ?)を使ってばかりでしたが、今回はReactorと合わせて
Spring Cloud Streamを使ってみたいと思います。
この機能を使うと、ReactorのFluxなどのAPIを使って、Spring Cloud Streamの機能を使ったプログラムを書くことができます。
お題
今回のお題は、こういうものでいきましょう。
- Source、Processor、Sinkの構成とする
- 各要素をつなぐメッセージングミドルウェアは、Apache Kafkaとする
- [Source] TwitterのストリームAPIを使い、「#nowplaying」タグを含むツイートをSourceとしてApache Kafkaに放り込む
- [Processor] CodelibsのApache Lucene Kuromoji Analyzerを使って、ツイートを単語に分解する
- この時、単語にツイートの日時も付加するようにする
- [Sink] Processorにより分解された単語を、Window処理でWord Countする
- Window処理はウィンドウサイズを1分、スライディングする時間を10秒として設定
- あまり大量に出力されるのも見づらいので、あるウィンドウ内で出現数が5以上の単語のみを出力
- 最後に、該当のウィンドウに含まれた単語がツイートされた時間をまとめて出力し、ウィンドウやスライディングの処理が想定通りになっているか確認する
要するに、ツイートを1分間ごとにWord Countしようというお題ですね。
これを、ReactorのAPIを使って実現します。
環境
Javaに関する情報は、こちら。
$ java -version openjdk version "1.8.0_171" OpenJDK Runtime Environment (build 1.8.0_171-8u171-b11-0ubuntu0.18.04.1-b11) OpenJDK 64-Bit Server VM (build 25.171-b11, mixed mode) $ mvn -version Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 2018-02-25T04:49:05+09:00) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 1.8.0_171, vendor: Oracle Corporation Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "4.15.0-23-generic", arch: "amd64", family: "unix"
Apache Kafkaは1.1.0を使用し、1 Nodeとします。また、事前に起動済みとします。
Spring Cloud Streamのバージョンは、Elmhurst.RELEASEです。
Maven依存関係
ここでは、親pom.xmlの内容をまずは紹介。
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-dependencies</artifactId> <version>Elmhurst.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>2.0.1.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.0.1.RELEASE</version> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
このサブモジュールとして、Source、Processor、Sinkを作っていきます。
Source
では、まずはSourceを作成します。Maven依存関係は、こちら。
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-reactive</artifactId> </dependency> <dependency> <groupId>org.twitter4j</groupId> <artifactId>twitter4j-stream</artifactId> <version>4.0.6</version> </dependency>
ミニマムでいっているので、Spring WebFluxすら使用していません。
TwitterのAPIの利用には、Twitter4Jを使用します。事前に、アカウントと「twitter4j.properties」を用意しておいてください。
データはSpring Cloud Stream経由でApache Kafkaに登録するため「spring-cloud-starter-stream-kafka」が必要ですが、Reactorを使うには
さらに「spring-cloud-stream-reactive」が必要です。
Sourceのソースコードは、こちら。
twitter-source/src/main/java/org/littlewings/spring/cloud/Twiturce.java
package org.littlewings.spring.cloud; import java.util.LinkedHashMap; import java.util.Map; import javax.annotation.PreDestroy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.Output; import org.springframework.cloud.stream.messaging.Source; import org.springframework.cloud.stream.reactive.StreamEmitter; import reactor.core.publisher.Flux; import twitter4j.StallWarning; import twitter4j.Status; import twitter4j.StatusDeletionNotice; import twitter4j.StatusListener; import twitter4j.TwitterStream; import twitter4j.TwitterStreamFactory; @SpringBootApplication @EnableBinding(Source.class) public class TwitteStreamSource { Logger logger = LoggerFactory.getLogger(TwitteStreamSource.class); TwitterStream twitterStream = new TwitterStreamFactory().getInstance(); public static void main(String... args) { SpringApplication.run(TwitteStreamSource.class, args); } @StreamEmitter @Output(Source.OUTPUT) public Flux<Map<String, Object>> streaming() { return Flux.create(sink -> { twitterStream.addListener(new StatusListener() { @Override public void onStatus(Status status) { Map<String, Object> tweet = new LinkedHashMap<>(); tweet.put("id", status.getId()); tweet.put("screenName", status.getUser().getScreenName()); tweet.put("name", status.getUser().getName()); tweet.put("text", status.getText()); tweet.put("createdAt", status.getCreatedAt()); logger.info("tweet = {}", tweet); sink.next(tweet); } @Override public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) { } @Override public void onTrackLimitationNotice(int numberOfLimitedStatuses) { } @Override public void onScrubGeo(long userId, long upToStatusId) { } @Override public void onStallWarning(StallWarning warning) { } @Override public void onException(Exception e) { e.printStackTrace(); } }); twitterStream.filter("#nowplaying"); }); } @PreDestroy public void stop() { twitterStream.clearListeners(); twitterStream.cleanUp(); } }
ここでのポイントは、@StreamEmitterアノテーションを付与したメソッドとし、@Output(Source.OUTPUT)を指定していることです。この宣言の状態で、
Fluxなどを返すとOKです。
@StreamEmitter @Output(Source.OUTPUT) public Flux<Map<String, Object>> streaming() { return Flux.create(sink -> {
なお、@OutputについてはFluxSenderを使用してデータを送っても大丈夫です。その場合は@StreamListenerとなっており、サンプルがドキュメントに
記載されています。
Programming Model / Reactive Programming Support
Fluxは、Flux#createを使って、Twitter4JのStatusListenerが受け取った値をMapに詰め替えてSinkに渡す実装としています。
return Flux.create(sink -> { twitterStream.addListener(new StatusListener() { @Override public void onStatus(Status status) { Map<String, Object> tweet = new LinkedHashMap<>(); tweet.put("id", status.getId()); tweet.put("screenName", status.getUser().getScreenName()); tweet.put("name", status.getUser().getName()); tweet.put("text", status.getText()); tweet.put("createdAt", status.getCreatedAt()); logger.info("tweet = {}", tweet); sink.next(tweet); }
一応トレースもしたいので、ツイート内容はログ出力するようにしています。
アプリケーションの設定は、こちら。
twitter-source/src/main/resources/application.properties
spring.cloud.stream.bindings.output.destination=tweet-source-topic spring.cloud.stream.bindings.output.contentType=application/json spring.cloud.stream.kafka.binder.brokers=172.17.0.2 spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
Apache Kafkaが動作しているサーバーは、「172.17.0.2」とします。
Twitter4Jの利用にあたっては、「twitter4j.properties」に設定を行っています。
twitter-source/src/main/resources/twitter4j.properties
Processor
続いて、Processor。Maven依存関係は、こちらです。
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-reactive</artifactId> </dependency> <dependency> <groupId>org.codelibs</groupId> <artifactId>lucene-analyzers-kuromoji-ipadic-neologd</artifactId> <version>7.3.1-20180604</version> </dependency> </dependencies> <repositories> <repository> <id>codelibs.org</id> <name>CodeLibs Repository</name> <url>http://maven.codelibs.org/</url> </repository> </repositories>
Apache Kuromojiは、Codelibsによりmecab-ipadic-NEologdを組み込んでビルドされたものを使用します。
Processorのソースコードは、こちら。
tokenize-processor/src/main/java/org/littlewings/spring/cloud/TweetTokenizeProcessor.java
package org.littlewings.spring.cloud; import java.io.IOException; import java.io.UncheckedIOException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; import org.codelibs.neologd.ipadic.lucene.analysis.ja.JapaneseAnalyzer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Processor; import reactor.core.publisher.Flux; @SpringBootApplication @EnableBinding(Processor.class) public class TweetTokenizeProcessor { Logger logger = LoggerFactory.getLogger(TweetTokenizeProcessor.class); JapaneseAnalyzer analyzer = new JapaneseAnalyzer(); public static void main(String... args) { SpringApplication.run(TweetTokenizeProcessor.class, args); } @StreamListener @Output(Processor.OUTPUT) public Flux<String[]> receive(@Input(Processor.INPUT) Flux<Map<String, Object>> tweets) { return tweets.flatMap(tweet -> { String tweetText = (String) tweet.get("text"); TokenStream tokenStream = analyzer.tokenStream("", tweetText); CharTermAttribute charTermAttribute = tokenStream.addAttribute(CharTermAttribute.class); try { List<String[]> tokens = new ArrayList<>(); tokenStream.reset(); while (tokenStream.incrementToken()) { String token = charTermAttribute.toString(); if (token.length() >= 5) { String dateAsString = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(tweet.get("createdAt")); logger.info("token = {}, time = {}", token, dateAsString); String[] tokenAndDate = {token, dateAsString}; tokens.add(tokenAndDate); } } tokenStream.end(); return Flux.fromIterable(tokens); } catch (IOException e) { throw new UncheckedIOException(e); } finally { try { tokenStream.close(); } catch (IOException e) { // ignore } } }); } }
今回は@StreamListenerとして定義し、入力を@InputかつFluxとして、結果を@OutputでFluxとして返すように作成しています。
@StreamListener @Output(Processor.OUTPUT) public Flux<String[]> receive(@Input(Processor.INPUT) Flux<Map<String, Object>> tweets) {
形態素解析後は、5文字以上の単語を、ツイートの日時と一緒にApache Kafkaに送ります。
while (tokenStream.incrementToken()) { String token = charTermAttribute.toString(); if (token.length() >= 5) { String dateAsString = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(tweet.get("createdAt")); logger.info("token = {}, time = {}", token, dateAsString); String[] tokenAndDate = {token, dateAsString}; tokens.add(tokenAndDate); } }
アプリケーションの設定は、こちら。
tokenize-processor/src/main/resources/application.properties
spring.cloud.stream.bindings.input.destination=tweet-source-topic spring.cloud.stream.bindings.input.group=tweet-source-group spring.cloud.stream.bindings.output.destination=tweet-word-topic spring.cloud.stream.bindings.output.contentType=application/json spring.cloud.stream.kafka.binder.brokers=172.17.0.2 spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
Sink
最後は、Sinkです。
Maven依存関係は、こちら。
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-reactive</artifactId> </dependency>
ソースコードは、こちら。
token-sink/src/main/java/org/littlewings/spring/cloud/WordCountSink.java
package org.littlewings.spring.cloud; import java.time.Duration; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @SpringBootApplication @EnableBinding(Sink.class) public class WordCountSink { Logger logger = LoggerFactory.getLogger(WordCountSink.class); public static void main(String... args) { SpringApplication.run(WordCountSink.class, args); } @StreamListener(Sink.INPUT) public void wordCount(Flux<String[]> words) { words .window(Duration.ofMinutes(1L), Duration.ofSeconds(10L)) .subscribe(ws -> { Mono<TimeRecords> records = ws.reduce(new TimeRecords(), (acc, word) -> { acc.addDate(word[1]); acc.increment(word[0]); return acc; }); records.subscribe(m -> { m.getWordCounts().entrySet().forEach(entry -> { if (entry.getValue() >= 5) { logger.info( "word = {}, count = {}", entry.getKey(), entry.getValue() ); } }); logger.info("contain times = {}", m.times); }); }); } static class TimeRecords { Set<String> times = new TreeSet<>(); Map<String, Integer> wordCounts = new TreeMap<>(); public void addDate(String date) { times.add(date); } public void increment(String word) { wordCounts.merge(word, 1, Integer::sum); } public Map<String, Integer> getWordCounts() { return wordCounts; } } }
@StreamListenerを付けて、Processorで単語分割した結果と、ツイートの日時(文字列ですが)をFluxとして受け取ります。
@StreamListener(Sink.INPUT) public void wordCount(Flux<String[]> words) {
そして、これをWord Count。
ウィンドウサイズを1分にして、10秒ごとにスライドさせるようにしました。
words .window(Duration.ofMinutes(1L), Duration.ofSeconds(10L))
Word Countは、ちょっと箱物クラスを作って、その中で行います。
static class TimeRecords { Set<String> times = new TreeSet<>(); Map<String, Integer> wordCounts = new TreeMap<>(); public void addDate(String date) { times.add(date); } public void increment(String word) { wordCounts.merge(word, 1, Integer::sum); } public Map<String, Integer> getWordCounts() { return wordCounts; } }
単語と出現回数自体はMapで持つのですが、その時に単語分割した時に得られた日時も持っておきます。
これで、ウィンドウ内に含まれる時間を見ようかなと。
集計部分。
.subscribe(ws -> { Mono<TimeRecords> records = ws.reduce(new TimeRecords(), (acc, word) -> { acc.addDate(word[1]); acc.increment(word[0]); return acc; }); records.subscribe(m -> { m.getWordCounts().entrySet().forEach(entry -> { if (entry.getValue() >= 5) { logger.info( "word = {}, count = {}", entry.getKey(), entry.getValue() ); } }); logger.info("contain times = {}", m.times); }); });
結果は標準出力に出すのですが、あんまり出力されすぎるのもなんなので、登場回数が5回以上のものにしました。
ただ、処理対象となった日時は全部含めるものとしています。
アプリケーションの設定は、こちら。
token-sink/src/main/resources/application.properties
spring.cloud.stream.bindings.input.destination=tweet-word-topic spring.cloud.stream.bindings.input.group=tweet-word-group spring.cloud.stream.kafka.binder.brokers=172.17.0.2 spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
実行
では、作成したアプリケーションを実行してみます。
パッケージング。
$ mvn package
実行。
## Source $ java -jar twitter-source/target/twitter-source-0.0.1-SNAPSHOT.jar ## Processor $ java -jar tokenize-processor/target/tokenize-processor-0.0.1-SNAPSHOT.jar ## Sink $ java -jar token-sink/target/token-sink-0.0.1-SNAPSHOT.jar
実行ログ。
Source…のログは、Twitterのidとかいろいろ出てくるので、省略。
Processor。httpsとか入っているのは、ご愛嬌…。
2018-06-19 21:02:57.645 INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor : token = リニューアル, time = 2018-06-19 21:02:57 2018-06-19 21:02:57.646 INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor : token = nowplaying, time = 2018-06-19 21:02:57 2018-06-19 21:02:57.646 INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor : token = https, time = 2018-06-19 21:02:57 2018-06-19 21:02:57.807 INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor : token = 08234, time = 2018-06-19 21:02:57 2018-06-19 21:02:57.808 INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor : token = nowplaying, time = 2018-06-19 21:02:57 2018-06-19 21:02:57.858 INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor : token = rokomama, time = 2018-06-19 21:02:57 2018-06-19 21:02:57.859 INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor : token = ジェジュン, time = 2018-06-19 21:02:57 2018-06-19 21:02:57.859 INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor : token = ジェジュン, time = 2018-06-19 21:02:57 2018-06-19 21:02:57.859 INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor : token = nowplaying, time = 2018-06-19 21:02:57 2018-06-19 21:02:57.859 INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor : token = https, time = 2018-06-19 21:02:57 2018-06-19 21:02:57.860 INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor : token = bdbyh, time = 2018-06-19 21:02:57 2018-06-19 21:02:57.869 INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor : token = hjinny, time = 2018-06-19 21:02:57 2018-06-19 21:02:57.870 INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor : token = ジェジュン, time = 2018-06-19 21:02:57 2018-06-19 21:02:57.870 INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor : token = nowplaying, time = 2018-06-19 21:02:57 2018-06-19 21:02:57.870 INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor : token = https, time = 2018-06-19 21:02:57 2018-06-19 21:02:57.870 INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor : token = sarwrsgk, time = 2018-06-19 21:02:57 2018-06-19 21:02:58.158 INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor : token = smallzy, time = 2018-06-19 21:02:57
Sink側は、最初は1分、それ以降は10秒ごとに、スライドしながら結果が出力されます。
2018-06-19 21:03:34.308 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = =love, count = 5 2018-06-19 21:03:34.308 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = bornfreeonekiss, count = 14 2018-06-19 21:03:34.309 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = favourite, count = 5 2018-06-19 21:03:34.309 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = https, count = 232 2018-06-19 21:03:34.309 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = japan, count = 7 2018-06-19 21:03:34.309 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = jejung, count = 30 2018-06-19 21:03:34.309 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = let's, count = 5 2018-06-19 21:03:34.309 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = listen, count = 11 2018-06-19 21:03:34.309 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = listenlive, count = 5 2018-06-19 21:03:34.309 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = love music, count = 5 2018-06-19 21:03:34.309 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = mrskodako, count = 6 2018-06-19 21:03:34.309 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = music, count = 14 2018-06-19 21:03:34.309 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = nowplaying, count = 273 2018-06-19 21:03:34.309 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = radio, count = 9 2018-06-19 21:03:34.310 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = single, count = 5 2018-06-19 21:03:34.310 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = smallzy, count = 5 2018-06-19 21:03:34.310 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = smallzyssurgery, count = 5 2018-06-19 21:03:34.310 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = sukatto, count = 7 2018-06-19 21:03:34.310 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = the the, count = 11 2018-06-19 21:03:34.310 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = youngblood, count = 5 2018-06-19 21:03:34.310 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = ありがとう, count = 10 2018-06-19 21:03:34.310 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = ジェジュン, count = 166 2018-06-19 21:03:34.310 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = リクエスト, count = 15 2018-06-19 21:03:34.310 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : word = 痛快tvスカッとジャパン, count = 5 2018-06-19 21:03:34.310 INFO 15255 --- [ parallel-2] o.l.spring.cloud.WordCountSink : contain times = [2018-06-19 21:02:34, 2018-06-19 21:02:35, 2018-06-19 21:02:36, 2018-06-19 21:02:37, 2018-06-19 21:02:38, 2018-06-19 21:02:39, 2018-06-19 21:02:40, 2018-06-19 21:02:41, 2018-06-19 21:02:42, 2018-06-19 21:02:43, 2018-06-19 21:02:44, 2018-06-19 21:02:45, 2018-06-19 21:02:46, 2018-06-19 21:02:47, 2018-06-19 21:02:48, 2018-06-19 21:02:49, 2018-06-19 21:02:50, 2018-06-19 21:02:51, 2018-06-19 21:02:52, 2018-06-19 21:02:53, 2018-06-19 21:02:54, 2018-06-19 21:02:55, 2018-06-19 21:02:56, 2018-06-19 21:02:57, 2018-06-19 21:02:58, 2018-06-19 21:02:59, 2018-06-19 21:03:00, 2018-06-19 21:03:01, 2018-06-19 21:03:02, 2018-06-19 21:03:03, 2018-06-19 21:03:04, 2018-06-19 21:03:05, 2018-06-19 21:03:06, 2018-06-19 21:03:07, 2018-06-19 21:03:08, 2018-06-19 21:03:09, 2018-06-19 21:03:10, 2018-06-19 21:03:11, 2018-06-19 21:03:12, 2018-06-19 21:03:13, 2018-06-19 21:03:14, 2018-06-19 21:03:15, 2018-06-19 21:03:16, 2018-06-19 21:03:17, 2018-06-19 21:03:18, 2018-06-19 21:03:19, 2018-06-19 21:03:20, 2018-06-19 21:03:21, 2018-06-19 21:03:22, 2018-06-19 21:03:23, 2018-06-19 21:03:24, 2018-06-19 21:03:25, 2018-06-19 21:03:26, 2018-06-19 21:03:27, 2018-06-19 21:03:28, 2018-06-19 21:03:29, 2018-06-19 21:03:30, 2018-06-19 21:03:31, 2018-06-19 21:03:32, 2018-06-19 21:03:33, 2018-06-19 21:03:34] 2018-06-19 21:03:44.309 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = =love, count = 5 2018-06-19 21:03:44.309 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = bornfreeonekiss, count = 15 2018-06-19 21:03:44.310 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = favourite, count = 8 2018-06-19 21:03:44.310 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = https, count = 231 2018-06-19 21:03:44.310 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = instagram, count = 5 2018-06-19 21:03:44.310 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = japan, count = 7 2018-06-19 21:03:44.311 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = jejung, count = 30 2018-06-19 21:03:44.311 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = let's, count = 8 2018-06-19 21:03:44.311 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = listen, count = 12 2018-06-19 21:03:44.312 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = listenlive, count = 6 2018-06-19 21:03:44.312 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = love music, count = 5 2018-06-19 21:03:44.313 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = mrskodako, count = 5 2018-06-19 21:03:44.313 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = music, count = 13 2018-06-19 21:03:44.314 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = nowplaying, count = 278 2018-06-19 21:03:44.314 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = radio, count = 13 2018-06-19 21:03:44.315 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = retweet, count = 7 2018-06-19 21:03:44.315 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = single, count = 5 2018-06-19 21:03:44.315 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = smallzy, count = 8 2018-06-19 21:03:44.316 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = smallzyssurgery, count = 8 2018-06-19 21:03:44.316 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = sukatto, count = 7 2018-06-19 21:03:44.317 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = the now, count = 6 2018-06-19 21:03:44.317 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = the the, count = 7 2018-06-19 21:03:44.318 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = tweet, count = 7 2018-06-19 21:03:44.318 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = youngblood, count = 8 2018-06-19 21:03:44.319 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = ありがとう, count = 9 2018-06-19 21:03:44.319 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = ジェジュン, count = 162 2018-06-19 21:03:44.319 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : word = リクエスト, count = 14 2018-06-19 21:03:44.320 INFO 15255 --- [ parallel-3] o.l.spring.cloud.WordCountSink : contain times = [2018-06-19 21:02:44, 2018-06-19 21:02:45, 2018-06-19 21:02:46, 2018-06-19 21:02:47, 2018-06-19 21:02:48, 2018-06-19 21:02:49, 2018-06-19 21:02:50, 2018-06-19 21:02:51, 2018-06-19 21:02:52, 2018-06-19 21:02:53, 2018-06-19 21:02:54, 2018-06-19 21:02:55, 2018-06-19 21:02:56, 2018-06-19 21:02:57, 2018-06-19 21:02:58, 2018-06-19 21:02:59, 2018-06-19 21:03:00, 2018-06-19 21:03:01, 2018-06-19 21:03:02, 2018-06-19 21:03:03, 2018-06-19 21:03:04, 2018-06-19 21:03:05, 2018-06-19 21:03:06, 2018-06-19 21:03:07, 2018-06-19 21:03:08, 2018-06-19 21:03:09, 2018-06-19 21:03:10, 2018-06-19 21:03:11, 2018-06-19 21:03:12, 2018-06-19 21:03:13, 2018-06-19 21:03:14, 2018-06-19 21:03:15, 2018-06-19 21:03:16, 2018-06-19 21:03:17, 2018-06-19 21:03:18, 2018-06-19 21:03:19, 2018-06-19 21:03:20, 2018-06-19 21:03:21, 2018-06-19 21:03:22, 2018-06-19 21:03:23, 2018-06-19 21:03:24, 2018-06-19 21:03:25, 2018-06-19 21:03:26, 2018-06-19 21:03:27, 2018-06-19 21:03:28, 2018-06-19 21:03:29, 2018-06-19 21:03:30, 2018-06-19 21:03:31, 2018-06-19 21:03:32, 2018-06-19 21:03:33, 2018-06-19 21:03:34, 2018-06-19 21:03:35, 2018-06-19 21:03:36, 2018-06-19 21:03:37, 2018-06-19 21:03:38, 2018-06-19 21:03:39, 2018-06-19 21:03:40, 2018-06-19 21:03:41, 2018-06-19 21:03:42, 2018-06-19 21:03:43]
ログを見ると、どの範囲の時間が含まれているか、わかる感じですね。