Apache Kafkaに含まれる、Apache Kafkaを使ったストリーム処理向けのライブラリ、Kafka Streamsを試してみます。
Kafka Streamsについては、こちら。
Kafka Streams
参考にした日本語情報。
Kafka StreamsをSpring Bootで試してみた - Mitsuyuki.Shiiba
オフィシャルサイトで、今回試した範囲でざっと気になるあたり…。
Core Concepts
Configuring a Streams Application
とまあ、いろいろあるのですが、小難しいことは置いておいて、とりあえず動かしてみましょう。
準備
Kafka Streamsを扱うのに必要な、Maven依存関係はこちら。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>1.0.0</version> </dependency>
お題は、オフィシャルのサンプルと同様に、Word Countにしてみましょう。
元ネタは、オフィシャルサイトのコンセプトのドキュメントにします。GitHubから。
https://raw.githubusercontent.com/apache/kafka/1.0/docs/streams/core-concepts.html
このページの取得、解析にはjsoupとApache Luceneを使うとしましょう。
<dependency> <groupId>org.jsoup</groupId> <artifactId>jsoup</artifactId> <version>1.11.2</version> </dependency> <dependency> <groupId>org.apache.lucene</groupId> <artifactId>lucene-core</artifactId> <version>7.2.0</version> </dependency>
あとは、テストライブラリを。
<dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.0.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.9.0</version> </dependency>
Apache KafkaのTopicsは、以下の2つを作成。
$ bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test-topic $ bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic word-count-topic
「test-topic」がKafka Streamsのコンセプトのドキュメントのテキストを登録するTopicで、「word-count-topic」がWord Countした結果を保存する
Topicです。
テストコードの雛形
作成したテストコードの雛形は、こちら。
src/test/java/org/littlewings/kafka/streams/KafkaStreamsTest.java
package org.littlewings.kafka.streams; import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Properties; import java.util.Spliterator; import java.util.Spliterators; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; import org.jsoup.Jsoup; import org.jsoup.nodes.Document; import org.jsoup.nodes.Element; import org.jsoup.select.Elements; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; public class KafkaStreamsTest { // ここに、テストを書く! }
とりあえず、データを入れてみる
まずは、データを入れておきます。ここは、Kafka Streamsは関係ないのでさらっと。
データは、「test-topic」に放り込んでいます。
List<String> loadKafkaStreamsConcept() { try { Document conceptDocument = Jsoup.connect("https://raw.githubusercontent.com/apache/kafka/1.0/docs/streams/core-concepts.html").get(); Elements contentTemplate = conceptDocument.select("#content-template"); Document inTemplateDocument = Jsoup.parse(contentTemplate.toString().replace("<script id=\"content-template\" type=\"text/x-handlebars-template\">", "").replace("</script>", "")); return inTemplateDocument.select("p").stream().map(Element::text).collect(Collectors.toList()); } catch (IOException e) { throw new UncheckedIOException(e); } } void loadTextsToTopic(String topicName, List<String> texts) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.25.0.3:9092"); try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties, new StringSerializer(), new StringSerializer())) { texts.forEach(text -> { try { producer.send(new ProducerRecord<>(topicName, null, text)).get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }); } } @Test public void loadData() { loadTextsToTopic("test-topic", loadKafkaStreamsConcept()); }
jsoupでデータをパース後、scriptタグを削除したりしていますが、これはHandlebarsのテンプレートになっていて、ここからHTMLを抜き出してセレクタで
評価できる形にしたかったからです。
初めてのKafka Streams
それでは、Kafka Streamsを使ったコードを書いてみます。
まずは、Kafka Streamsnのドキュメントサンプルをほぼそのまま真似ました。
Kafka Streams
書いたコードは、こちら。
@Test public void wordCount() throws IOException { Properties properties = new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "172.25.0.3:9092"); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100"); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> texts = builder.stream("test-topic"); KTable<String, Long> wordCounts = texts .flatMapValues(value -> tokenize(value)) .groupBy((key, word) -> word) .count(Materialized.as("counts-store")); //wordCounts.toStream().to("word-count-topic", Produced.with(Serdes.String(), Serdes.Long())); wordCounts.toStream().to("word-count-topic", Produced.valueSerde(Serdes.Long())); KafkaStreams streams = new KafkaStreams(builder.build(), properties); streams.start(); try { TimeUnit.SECONDS.sleep(5L); } catch (InterruptedException e) { // ignore } streams.close(); }
tokenizeしている部分は、こういう感じになっています。Apache LuceneのStandardAnalyzerでアナライズして単語に分解します。
List<String> tokenize(String text) { Analyzer analyzer = new StandardAnalyzer(); TokenStream tokenStream = analyzer.tokenStream("", text); CharTermAttribute charTermAttribute = tokenStream.addAttribute(CharTermAttribute.class); try { tokenStream.reset(); List<String> tokens = new ArrayList<>(); while (tokenStream.incrementToken()) { tokens.add(charTermAttribute.toString()); } tokenStream.end(); tokenStream.close(); return tokens; } catch (IOException e) { throw new UncheckedIOException(e); } }
最初に、Apache Kafkaへの接続情報やSerialize/Deserializeで使うデフォルトのクラスなどを設定します。
Properties properties = new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "172.25.0.3:9092"); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100");
「wordcount-application」は今回のアプリケーション名で、コミット間隔は100msecにしています。
StreamsBuilderを使い、「test-topic」を起点にKStreamを作成し、Stream APIのように処理を書いていきます。Word Countなので、最後はcount
なのですが、その際にMaterializedを使用するようです。
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> texts = builder.stream("test-topic"); KTable<String, Long> wordCounts = texts .flatMapValues(value -> tokenize(value)) .groupBy((key, word) -> word) .count(Materialized.as("counts-store"));
そして、このStreamの結果を別のTopicに書き出します。ここでは、最初に用意した「word-count-topic」に書き込むようにします。
この時に
//wordCounts.toStream().to("word-count-topic", Produced.with(Serdes.String(), Serdes.Long())); wordCounts.toStream().to("word-count-topic", Produced.valueSerde(Serdes.Long()));
toメソッドでTopicに書き出す際に、デフォルトのSerializer/Deserializaerを変更することができます。ドキュメントではProduced#withを使用して
キーと値の両方を変更していますが、Produced#valueSerdeのように値に適用するSerializer/Deserializerのみの変更を行うなどもできるようです。
「Serdes」ってなんだろうと思っていたのですが、Serialize/Deserializerのそれぞれ先頭3文字から取ったものみたいですね。
ここまで定義したら、KafkaStreamsのインスタンスを作成してstart。
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
streams.start();
すぐに終わってしまうと困るので、今回はsleepさせておきます…。
try { TimeUnit.SECONDS.sleep(5L); } catch (InterruptedException e) { // ignore }
本来は実行し続ける感じになるので、ドキュメントではSIGTERM送信時にRuntime#shutdownHookを使ってKafkaStreams#closeを呼び出す例が記載されています。
Writing a Streams Application
KafkaStreamsのインスタンスは、使い終わったらclose。
streams.close();
なお、このコードを実行すると、作成したTopic以外に2つのTopicが作られるようです。
それぞれ、「wordcount-application-counts-store-changelog」と「wordcount-application-counts-store-repartition」という名前。
アプリケーションの名前と、count時に指定したstoreの名前からきているようですね。
とりあえず、今回は置いておきます…。
Query
どうやら、Query(…というより、Iteratorですが)も使用できるとか。
Interactive Queries
こちらも試してみます。
@Test public void query() { Properties properties = new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "172.25.0.3:9092"); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1000"); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> texts = builder.stream("test-topic"); KTable<String, Long> wordCounts = texts .flatMapValues(value -> tokenize(value)) .groupBy((key, word) -> word) .count(Materialized.as("counts-store")); //wordCounts.toStream().to("word-count-topic", Produced.with(Serdes.String(), Serdes.Long())); wordCounts.toStream().to("word-count-topic", Produced.valueSerde(Serdes.Long())); KafkaStreams streams = new KafkaStreams(builder.build(), properties); streams.start(); try { TimeUnit.SECONDS.sleep(10L); } catch (InterruptedException e) { // ignore } ReadOnlyKeyValueStore<String, Long> keyValueStore = streams.store("counts-store", QueryableStoreTypes.keyValueStore()); KeyValueIterator<String, Long> range = keyValueStore.all(); List<KeyValue<String, Long>> results = StreamSupport .stream(Spliterators.spliteratorUnknownSize(range, Spliterator.IMMUTABLE), false) .collect(Collectors.toList()); streams.close(); List<KeyValue<String, Long>> top10 = results .stream() .sorted(Comparator.comparing((KeyValue<String, Long> kv) -> kv.value).reversed()) .collect(Collectors.toList()) .subList(0, 10); top10.forEach(System.out::println); assertThat(top10.get(0)).isEqualTo(new KeyValue<>("kafka", 36L)); assertThat(top10.get(1)).isEqualTo(new KeyValue<>("processing", 26L)); assertThat(top10.get(2)).isEqualTo(new KeyValue<>("streams", 24L)); assertThat(top10.get(3)).isEqualTo(new KeyValue<>("time", 23L)); assertThat(top10.get(4)).isEqualTo(new KeyValue<>("stream", 20L)); assertThat(top10.get(5)).isEqualTo(new KeyValue<>("state", 12L)); assertThat(top10.get(6)).isEqualTo(new KeyValue<>("data", 11L)); assertThat(top10.get(7)).isEqualTo(new KeyValue<>("once", 10L)); assertThat(top10.get(8)).isEqualTo(new KeyValue<>("stores", 10L)); assertThat(top10.get(9)).isEqualTo(new KeyValue<>("application", 9L)); }
KafkaStreamsを作成して実行するところまでは先ほどとほぼ同じで、KafkaStreamsのインスタンスと途中で使った「counts-store」を使って
ReadOnlyKeyValueStoreのインスタンスを取得します。
ReadOnlyKeyValueStore<String, Long> keyValueStore =
streams.store("counts-store", QueryableStoreTypes.keyValueStore());
KeyValueIterator<String, Long> range = keyValueStore.all();
ここから、KeyValueIteratorを取得することができるので、あとはIteratorとして使えばOKです。
今回はReadOnlyKeyValueStore#allを使って全レコードを取得しましたが、キーのみを指定したgetや、範囲を指定したrangeなどもあります。
https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.html
その他、SessionStoreやWindowStoreなどもあるようなので、また気が向いたら…。
とりあえず、全レコードをListにしてKafkaStreams#close。
List<KeyValue<String, Long>> results =
StreamSupport
.stream(Spliterators.spliteratorUnknownSize(range, Spliterator.IMMUTABLE), false)
.collect(Collectors.toList());
streams.close();
上位10件くらいをちょっと強引に抜き出して、見てみます。
List<KeyValue<String, Long>> top10 = results .stream() .sorted(Comparator.comparing((KeyValue<String, Long> kv) -> kv.value).reversed()) .collect(Collectors.toList()) .subList(0, 10); top10.forEach(System.out::println); assertThat(top10.get(0)).isEqualTo(new KeyValue<>("kafka", 36L)); assertThat(top10.get(1)).isEqualTo(new KeyValue<>("processing", 26L)); assertThat(top10.get(2)).isEqualTo(new KeyValue<>("streams", 24L)); assertThat(top10.get(3)).isEqualTo(new KeyValue<>("time", 23L)); assertThat(top10.get(4)).isEqualTo(new KeyValue<>("stream", 20L)); assertThat(top10.get(5)).isEqualTo(new KeyValue<>("state", 12L)); assertThat(top10.get(6)).isEqualTo(new KeyValue<>("data", 11L)); assertThat(top10.get(7)).isEqualTo(new KeyValue<>("once", 10L)); assertThat(top10.get(8)).isEqualTo(new KeyValue<>("stores", 10L)); assertThat(top10.get(9)).isEqualTo(new KeyValue<>("application", 9L));
直接見てみる?
KafkaStreamsを使って書き出したTopic「word-count-topic」を、Consumerを使って直接見たらどうなるか気になったので、ちょっと見てみました。
@Test public void assertWriteBackedTopic() { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.25.0.3:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); try (KafkaConsumer<String, Long> consumer = new KafkaConsumer<>(properties, new StringDeserializer(), new LongDeserializer())) { consumer.subscribe(Arrays.asList("word-count-topic")); ConsumerRecords<String, Long> records = consumer.poll(1000L); assertThat(records.count()).isEqualTo(373); Iterator<ConsumerRecord<String, Long>> iterator = records.iterator(); List<ConsumerRecord<String, Long>> recordsList = StreamSupport .stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.IMMUTABLE), false) .sorted(Comparator.comparing((ConsumerRecord<String, Long> record) -> record.value()).reversed()) .collect(Collectors.toList()); recordsList.forEach(System.out::println); assertThat(recordsList.get(0).key()).isEqualTo("kafka"); assertThat(recordsList.get(0).value()).isEqualTo(36L); assertThat(recordsList.get(1).key()).isEqualTo("processing"); assertThat(recordsList.get(1).value()).isEqualTo(26L); assertThat(recordsList.get(2).key()).isEqualTo("streams"); assertThat(recordsList.get(2).value()).isEqualTo(24L); } }
いたって普通に見れました。まあ、Word Countした結果が入っていますね。
「wordcount-application-counts-store-changelog」についても見てみましたが、そう変わらず。
@Test public void assertStateTopic() { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.25.0.3:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); try (KafkaConsumer<String, Long> consumer = new KafkaConsumer<>(properties, new StringDeserializer(), new LongDeserializer())) { consumer.subscribe(Arrays.asList("wordcount-application-counts-store-changelog")); ConsumerRecords<String, Long> records = consumer.poll(1000L); assertThat(records.count()).isEqualTo(373); Iterator<ConsumerRecord<String, Long>> iterator = records.iterator(); List<ConsumerRecord<String, Long>> recordsList = StreamSupport .stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.IMMUTABLE), false) .sorted(Comparator.comparing((ConsumerRecord<String, Long> record) -> record.value()).reversed()) .collect(Collectors.toList()); recordsList.forEach(System.out::println); assertThat(recordsList.get(0).key()).isEqualTo("kafka"); assertThat(recordsList.get(0).value()).isEqualTo(36L); assertThat(recordsList.get(1).key()).isEqualTo("processing"); assertThat(recordsList.get(1).value()).isEqualTo(26L); assertThat(recordsList.get(2).key()).isEqualTo("streams"); assertThat(recordsList.get(2).value()).isEqualTo(24L); } }
ちょっとこのあたりの話はちゃんと見ていないので、おいおい…。
まとめ
とりあえず、Kafka Streamsを試してみました。コードを書くのはそう迷いませんでしたが、Apache Kafkaの挙動と合わさって動作確認はちょっと
てこずりました…。
けっこう面白そうなので、少しずつ見ていってみましょう。