CLOVER🍀

That was when it all began.

初めてのKafka Streams

Apache Kafkaに含まれる、Apache Kafkaを使ったストリーム処理向けのライブラリ、Kafka Streamsを試してみます。

Kafka Streamsについては、こちら。
Kafka Streams

参考にした日本語情報。
Kafka StreamsをSpring Bootで試してみた - Mitsuyuki.Shiiba

噛み砕いてKafka Streams #kafkajp

Kafka Streams

オフィシャルサイトで、今回試した範囲でざっと気になるあたり…。
Core Concepts

Kafka Streams Configs

Configuring a Streams Application

Interactive Queries

とまあ、いろいろあるのですが、小難しいことは置いておいて、とりあえず動かしてみましょう。

準備

Kafka Streamsを扱うのに必要な、Maven依存関係はこちら。

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>1.0.0</version>
        </dependency>

お題は、オフィシャルのサンプルと同様に、Word Countにしてみましょう。

元ネタは、オフィシャルサイトのコンセプトのドキュメントにします。GitHubから。
https://raw.githubusercontent.com/apache/kafka/1.0/docs/streams/core-concepts.html

このページの取得、解析にはjsoupとApache Luceneを使うとしましょう。

        <dependency>
            <groupId>org.jsoup</groupId>
            <artifactId>jsoup</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.lucene</groupId>
            <artifactId>lucene-core</artifactId>
            <version>7.2.0</version>
        </dependency>

あとは、テストライブラリを。

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.0.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.9.0</version>
        </dependency>

Apache KafkaのTopicsは、以下の2つを作成。

$ bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test-topic
$ bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic word-count-topic

「test-topic」がKafka Streamsのコンセプトのドキュメントのテキストを登録するTopicで、「word-count-topic」がWord Countした結果を保存する
Topicです。

テストコードの雛形

作成したテストコードの雛形は、こちら。
src/test/java/org/littlewings/kafka/streams/KafkaStreamsTest.java

package org.littlewings.kafka.streams;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class KafkaStreamsTest {
    // ここに、テストを書く!
}

とりあえず、データを入れてみる

まずは、データを入れておきます。ここは、Kafka Streamsは関係ないのでさらっと。
データは、「test-topic」に放り込んでいます。

    List<String> loadKafkaStreamsConcept() {
        try {
            Document conceptDocument =
                    Jsoup.connect("https://raw.githubusercontent.com/apache/kafka/1.0/docs/streams/core-concepts.html").get();

            Elements contentTemplate = conceptDocument.select("#content-template");
            Document inTemplateDocument =
                    Jsoup.parse(contentTemplate.toString().replace("<script id=\"content-template\" type=\"text/x-handlebars-template\">", "").replace("</script>", ""));

            return inTemplateDocument.select("p").stream().map(Element::text).collect(Collectors.toList());
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    void loadTextsToTopic(String topicName, List<String> texts) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.25.0.3:9092");

        try (KafkaProducer<String, String> producer =
                     new KafkaProducer<>(properties, new StringSerializer(), new StringSerializer())) {
            texts.forEach(text -> {
                try {
                    producer.send(new ProducerRecord<>(topicName, null, text)).get();
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            });
        }
    }

    @Test
    public void loadData() {
        loadTextsToTopic("test-topic", loadKafkaStreamsConcept());
    }

jsoupでデータをパース後、scriptタグを削除したりしていますが、これはHandlebarsのテンプレートになっていて、ここからHTMLを抜き出してセレクタ
評価できる形にしたかったからです。

初めてのKafka Streams

それでは、Kafka Streamsを使ったコードを書いてみます。

まずは、Kafka Streamsnのドキュメントサンプルをほぼそのまま真似ました。
Kafka Streams

書いたコードは、こちら。

    @Test
    public void wordCount() throws IOException {
        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "172.25.0.3:9092");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100");

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> texts = builder.stream("test-topic");

        KTable<String, Long> wordCounts =
                texts
                        .flatMapValues(value -> tokenize(value))
                        .groupBy((key, word) -> word)
                        .count(Materialized.as("counts-store"));

        //wordCounts.toStream().to("word-count-topic", Produced.with(Serdes.String(), Serdes.Long()));
        wordCounts.toStream().to("word-count-topic", Produced.valueSerde(Serdes.Long()));

        KafkaStreams streams = new KafkaStreams(builder.build(), properties);
        streams.start();

        try {
            TimeUnit.SECONDS.sleep(5L);
        } catch (InterruptedException e) {
            // ignore
        }

        streams.close();
    }

tokenizeしている部分は、こういう感じになっています。Apache LuceneのStandardAnalyzerでアナライズして単語に分解します。

    List<String> tokenize(String text) {
        Analyzer analyzer = new StandardAnalyzer();
        TokenStream tokenStream = analyzer.tokenStream("", text);
        CharTermAttribute charTermAttribute = tokenStream.addAttribute(CharTermAttribute.class);
        try {
            tokenStream.reset();

            List<String> tokens = new ArrayList<>();
            while (tokenStream.incrementToken()) {
                tokens.add(charTermAttribute.toString());
            }

            tokenStream.end();
            tokenStream.close();

            return tokens;
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

最初に、Apache Kafkaへの接続情報やSerialize/Deserializeで使うデフォルトのクラスなどを設定します。

        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "172.25.0.3:9092");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100");

「wordcount-application」は今回のアプリケーション名で、コミット間隔は100msecにしています。

StreamsBuilderを使い、「test-topic」を起点にKStreamを作成し、Stream APIのように処理を書いていきます。Word Countなので、最後はcount
なのですが、その際にMaterializedを使用するようです。

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> texts = builder.stream("test-topic");

        KTable<String, Long> wordCounts =
                texts
                        .flatMapValues(value -> tokenize(value))
                        .groupBy((key, word) -> word)
                        .count(Materialized.as("counts-store"));

そして、このStreamの結果を別のTopicに書き出します。ここでは、最初に用意した「word-count-topic」に書き込むようにします。
この時に

        //wordCounts.toStream().to("word-count-topic", Produced.with(Serdes.String(), Serdes.Long()));
        wordCounts.toStream().to("word-count-topic", Produced.valueSerde(Serdes.Long()));

toメソッドでTopicに書き出す際に、デフォルトのSerializer/Deserializaerを変更することができます。ドキュメントではProduced#withを使用して
キーと値の両方を変更していますが、Produced#valueSerdeのように値に適用するSerializer/Deserializerのみの変更を行うなどもできるようです。

「Serdes」ってなんだろうと思っていたのですが、Serialize/Deserializerのそれぞれ先頭3文字から取ったものみたいですね。

ここまで定義したら、KafkaStreamsのインスタンスを作成してstart。

        KafkaStreams streams = new KafkaStreams(builder.build(), properties);
        streams.start();

すぐに終わってしまうと困るので、今回はsleepさせておきます…。

        try {
            TimeUnit.SECONDS.sleep(5L);
        } catch (InterruptedException e) {
            // ignore
        }

本来は実行し続ける感じになるので、ドキュメントではSIGTERM送信時にRuntime#shutdownHookを使ってKafkaStreams#closeを呼び出す例が記載されています。
Writing a Streams Application

KafkaStreamsのインスタンスは、使い終わったらclose。

        streams.close();

なお、このコードを実行すると、作成したTopic以外に2つのTopicが作られるようです。

それぞれ、「wordcount-application-counts-store-changelog」と「wordcount-application-counts-store-repartition」という名前。
アプリケーションの名前と、count時に指定したstoreの名前からきているようですね。

とりあえず、今回は置いておきます…。

Query

どうやら、Query(…というより、Iteratorですが)も使用できるとか。
Interactive Queries

こちらも試してみます。

    @Test
    public void query() {
        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "172.25.0.3:9092");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1000");

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> texts = builder.stream("test-topic");

        KTable<String, Long> wordCounts =
                texts
                        .flatMapValues(value -> tokenize(value))
                        .groupBy((key, word) -> word)
                        .count(Materialized.as("counts-store"));

        //wordCounts.toStream().to("word-count-topic", Produced.with(Serdes.String(), Serdes.Long()));
        wordCounts.toStream().to("word-count-topic", Produced.valueSerde(Serdes.Long()));

        KafkaStreams streams = new KafkaStreams(builder.build(), properties);
        streams.start();

        try {
            TimeUnit.SECONDS.sleep(10L);
        } catch (InterruptedException e) {
            // ignore
        }

        ReadOnlyKeyValueStore<String, Long> keyValueStore =
                streams.store("counts-store", QueryableStoreTypes.keyValueStore());

        KeyValueIterator<String, Long> range = keyValueStore.all();
        List<KeyValue<String, Long>> results =
                StreamSupport
                        .stream(Spliterators.spliteratorUnknownSize(range, Spliterator.IMMUTABLE), false)
                        .collect(Collectors.toList());

        streams.close();

        List<KeyValue<String, Long>> top10 =
                results
                        .stream()
                        .sorted(Comparator.comparing((KeyValue<String, Long> kv) -> kv.value).reversed())
                        .collect(Collectors.toList())
                        .subList(0, 10);

        top10.forEach(System.out::println);

        assertThat(top10.get(0)).isEqualTo(new KeyValue<>("kafka", 36L));
        assertThat(top10.get(1)).isEqualTo(new KeyValue<>("processing", 26L));
        assertThat(top10.get(2)).isEqualTo(new KeyValue<>("streams", 24L));
        assertThat(top10.get(3)).isEqualTo(new KeyValue<>("time", 23L));
        assertThat(top10.get(4)).isEqualTo(new KeyValue<>("stream", 20L));
        assertThat(top10.get(5)).isEqualTo(new KeyValue<>("state", 12L));
        assertThat(top10.get(6)).isEqualTo(new KeyValue<>("data", 11L));
        assertThat(top10.get(7)).isEqualTo(new KeyValue<>("once", 10L));
        assertThat(top10.get(8)).isEqualTo(new KeyValue<>("stores", 10L));
        assertThat(top10.get(9)).isEqualTo(new KeyValue<>("application", 9L));
    }

KafkaStreamsを作成して実行するところまでは先ほどとほぼ同じで、KafkaStreamsのインスタンスと途中で使った「counts-store」を使って
ReadOnlyKeyValueStoreのインスタンスを取得します。

        ReadOnlyKeyValueStore<String, Long> keyValueStore =
                streams.store("counts-store", QueryableStoreTypes.keyValueStore());

        KeyValueIterator<String, Long> range = keyValueStore.all();

ここから、KeyValueIteratorを取得することができるので、あとはIteratorとして使えばOKです。

今回はReadOnlyKeyValueStore#allを使って全レコードを取得しましたが、キーのみを指定したgetや、範囲を指定したrangeなどもあります。
https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.html

その他、SessionStoreやWindowStoreなどもあるようなので、また気が向いたら…。

とりあえず、全レコードをListにしてKafkaStreams#close。

        List<KeyValue<String, Long>> results =
                StreamSupport
                        .stream(Spliterators.spliteratorUnknownSize(range, Spliterator.IMMUTABLE), false)
                        .collect(Collectors.toList());

        streams.close();

上位10件くらいをちょっと強引に抜き出して、見てみます。

        List<KeyValue<String, Long>> top10 =
                results
                        .stream()
                        .sorted(Comparator.comparing((KeyValue<String, Long> kv) -> kv.value).reversed())
                        .collect(Collectors.toList())
                        .subList(0, 10);

        top10.forEach(System.out::println);

        assertThat(top10.get(0)).isEqualTo(new KeyValue<>("kafka", 36L));
        assertThat(top10.get(1)).isEqualTo(new KeyValue<>("processing", 26L));
        assertThat(top10.get(2)).isEqualTo(new KeyValue<>("streams", 24L));
        assertThat(top10.get(3)).isEqualTo(new KeyValue<>("time", 23L));
        assertThat(top10.get(4)).isEqualTo(new KeyValue<>("stream", 20L));
        assertThat(top10.get(5)).isEqualTo(new KeyValue<>("state", 12L));
        assertThat(top10.get(6)).isEqualTo(new KeyValue<>("data", 11L));
        assertThat(top10.get(7)).isEqualTo(new KeyValue<>("once", 10L));
        assertThat(top10.get(8)).isEqualTo(new KeyValue<>("stores", 10L));
        assertThat(top10.get(9)).isEqualTo(new KeyValue<>("application", 9L));

直接見てみる?

KafkaStreamsを使って書き出したTopic「word-count-topic」を、Consumerを使って直接見たらどうなるか気になったので、ちょっと見てみました。

    @Test
    public void assertWriteBackedTopic() {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.25.0.3:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        try (KafkaConsumer<String, Long> consumer =
                     new KafkaConsumer<>(properties, new StringDeserializer(), new LongDeserializer())) {
            consumer.subscribe(Arrays.asList("word-count-topic"));

            ConsumerRecords<String, Long> records = consumer.poll(1000L);

            assertThat(records.count()).isEqualTo(373);
            Iterator<ConsumerRecord<String, Long>> iterator = records.iterator();

            List<ConsumerRecord<String, Long>> recordsList =
                    StreamSupport
                            .stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.IMMUTABLE), false)
                            .sorted(Comparator.comparing((ConsumerRecord<String, Long> record) -> record.value()).reversed())
                            .collect(Collectors.toList());

            recordsList.forEach(System.out::println);

            assertThat(recordsList.get(0).key()).isEqualTo("kafka");
            assertThat(recordsList.get(0).value()).isEqualTo(36L);

            assertThat(recordsList.get(1).key()).isEqualTo("processing");
            assertThat(recordsList.get(1).value()).isEqualTo(26L);

            assertThat(recordsList.get(2).key()).isEqualTo("streams");
            assertThat(recordsList.get(2).value()).isEqualTo(24L);
        }
    }

いたって普通に見れました。まあ、Word Countした結果が入っていますね。

「wordcount-application-counts-store-changelog」についても見てみましたが、そう変わらず。

    @Test
    public void assertStateTopic() {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.25.0.3:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        try (KafkaConsumer<String, Long> consumer =
                     new KafkaConsumer<>(properties, new StringDeserializer(), new LongDeserializer())) {
            consumer.subscribe(Arrays.asList("wordcount-application-counts-store-changelog"));

            ConsumerRecords<String, Long> records = consumer.poll(1000L);

            assertThat(records.count()).isEqualTo(373);
            Iterator<ConsumerRecord<String, Long>> iterator = records.iterator();

            List<ConsumerRecord<String, Long>> recordsList =
                    StreamSupport
                            .stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.IMMUTABLE), false)
                            .sorted(Comparator.comparing((ConsumerRecord<String, Long> record) -> record.value()).reversed())
                            .collect(Collectors.toList());

            recordsList.forEach(System.out::println);

            assertThat(recordsList.get(0).key()).isEqualTo("kafka");
            assertThat(recordsList.get(0).value()).isEqualTo(36L);

            assertThat(recordsList.get(1).key()).isEqualTo("processing");
            assertThat(recordsList.get(1).value()).isEqualTo(26L);

            assertThat(recordsList.get(2).key()).isEqualTo("streams");
            assertThat(recordsList.get(2).value()).isEqualTo(24L);
        }
    }

ちょっとこのあたりの話はちゃんと見ていないので、おいおい…。

まとめ

とりあえず、Kafka Streamsを試してみました。コードを書くのはそう迷いませんでしたが、Apache Kafkaの挙動と合わさって動作確認はちょっと
てこずりました…。

けっこう面白そうなので、少しずつ見ていってみましょう。