CLOVER🍀

That was when it all began.

Apache KafkaのQuickstartのサンプルを、JavaのClient APIで書き直してみた

Apache KafkaのClient APIを使った、いわゆる「Hello World」的なことを試してみよう…と思ったのですが、
ドキュメントを見てもそれほど書いてありません。

Documentation / APIS

Maven Dependency

		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>0.10.2.0</version>
		</dependency>

Configurationといったところが主な内容ですが。

Documentation / Configuration / Producer Configs

Documentation / Configuration / Consumer Configs

あとは、リポジトリ上に含まれている、examplesを見るのがよいでしょうか?

kafka/examples/src/main/java/kafka/examples at 0.10.2.0 · apache/kafka · GitHub

あんまり情報ないですけど、このあたりを参考にQuickstartで書かれていた

Apache Kafka

簡単なサンプル相当のものをClient APIを使って書いてみたいと思います。

Producer。

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

Consumer。

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

準備

まずは、Maven依存関係から。先ほど記載したとおり、「kafka-clients」への依存関係を追加します。

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.0</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.6.2</version>
            <scope>test</scope>
        </dependency>

テストコードでサンプルは書いてみようと思いますので、JUnitとAssertJも追加。

Topicもあらかじめ作成しておきます。今回使うTopicの名前は、「my-topic」としました。

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic

あと、こちらの実行上の都合ですが、別ホストで起動させたApache KafkaのBrokerにProducerおよびConsumerからの
接続をしたいと思いますので、config/server.propertiesでのlistenersには次のようにしておきます。

listeners=PLAINTEXT://[Apache Kafkaが稼働しているサーバーのIPアドレス]:9092

今回のサーバーIPアドレスは、「172.17.0.2」とします。

listeners=PLAINTEXT://172.17.0.2:9092

では、この環境に対してApache KafkaのProducerとConsumerを書いていきましょう。

テストコードの雛形

テストコードの雛形は、以下とします。
src/test/java/org/littlewings/kafka/GettingStartedTest.java

package org.littlewings.kafka;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class GettingStartedTest {
    // ここに、テストコードを書く!
}

このテストクラスの中に、ProducerとConsumerを使ったテストメソッドを書いていきます。

Producer

まずは、Producerから。

こんな感じに文字列を放り込むようにいってみましょう。

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

Producerを作るにあたり、参考にしたのは次の情報です。

https://github.com/apache/kafka/blob/0.10.2.0/examples/src/main/java/kafka/examples/Producer.java

また、Javadocを見るのもよいでしょう。

kafka 0.10.2.1 API

むしろ、これくらいしかない気もしますが…。

で、できあがったのがこちら。

    @Test
    public void gettingStartedProducer() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.17.0.2:9092");

        try (KafkaProducer<Integer, String> producer =
                     new KafkaProducer<>(properties, new IntegerSerializer(), new StringSerializer())) {

            IntStream
                    .rangeClosed(1, 10)
                    .forEach(i -> {
                        try {
                            producer.send(new ProducerRecord<>("my-topic", i, "value" + i)).get();
                        } catch (InterruptedException | ExecutionException e) {
                            throw new RuntimeException(e);
                        }
                    });
        }
    }

Propertiesでいろいろ設定ができるのですが、今回は接続先のサーバーのみ指定しました。定数値を使っていますが、内容は
「bootstrap.servers」です。

        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.17.0.2:9092");

全設定項目は、こちらを参照しましょう。

Documentation / Configuration / Producer Configs

Serializerは、Propertiesでも設定できるのですが、今回はKafkaProducerのコンストラクタの引数として
設定しました。

        try (KafkaProducer<Integer, String> producer =
                     new KafkaProducer<>(properties, new IntegerSerializer(), new StringSerializer())) {

Apache Kafkaに登録するレコードとして、KeyとValueのSerializerがそれぞれ必要になります。今回は、KeyをInteger、
ValueをStringとします。

で、あとはProducerRecordを作成して、KafkaProducer#sendで送り込みます。

            IntStream
                    .rangeClosed(1, 10)
                    .forEach(i -> {
                        try {
                            producer.send(new ProducerRecord<>("my-topic", i, "value" + i)).get();
                        } catch (InterruptedException | ExecutionException e) {
                            throw new RuntimeException(e);
                        }
                    });

ProducerRecordのコンストラクタには、Topic名、Key、Valueをそれぞれ設定します。KafkaProducer#sendの戻り値はFutureなので、
今回はそれぞれFuture#getで待ち合わせるようにしました。

これで、Producer側はおしまいです。

Consumer

続いて、Consumer。こちらに似た内容を目指します。

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

「--from-beginning」があるので、「Topicの最初から読む」ですね。

Comsumer側も参考にしたのは、examplesのコードと

https://github.com/apache/kafka/blob/0.10.2.0/examples/src/main/java/kafka/examples/Consumer.java

Javadocになります。

kafka 0.10.2.1 API

もちろん、設定についてのドキュメントも。

Documentation / Configuration / Consumer Configs

で、できあがったコードはこちら。

    @Test
    public void gettingStartedConsumer() {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.17.0.2:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<Integer, String> consumer =
                new KafkaConsumer<>(properties, new IntegerDeserializer(), new StringDeserializer());

        List<ConsumerRecord<Integer, String>> received = new ArrayList<>();

        consumer.subscribe(Arrays.asList("my-topic"));

        while (received.size() < 10) {
            ConsumerRecords<Integer, String> records = consumer.poll(1000L);

            records.forEach(received::add);
        }

        assertThat(received)
                .hasSize(10);
        assertThat(received.stream().map(c -> c.value()).collect(Collectors.toList()))
                .containsExactly(IntStream.rangeClosed(1, 10).mapToObj(i -> "value" + i).toArray(String[]::new));
    }

まずは、Producerと同様、Consumer側も設定としてPropertiesにいろいろ登録していきます。

        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.17.0.2:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Consumer側は、少なくとも「bootstrap.servers」と「group.id」の指定が必要なようです。

また、「auto.offset.reset」についても「earliest」と指定しているのですが、こちらは「--from-beginning」と同じ動きを
させるために指定しています。指定しない場合のデフォルト値は「latest」となり、Consumerが起動した跡にTopicに登録
されたデータを読み出していくことになります。

レコードのKey/Valueは、IntegerとStringなのでProducerと同様に指定。

        KafkaConsumer<Integer, String> consumer =
                new KafkaConsumer<>(properties, new IntegerDeserializer(), new StringDeserializer());

KafkaConsumerで、subscribeするTopicの名前(複数可)を指定します。

        consumer.subscribe(Arrays.asList("my-topic"));

あとは、KafkaConsumer#pollでメッセージの受信を待ちます。今回は、10件以上受信したらpollingをやめるようにしています。
というか、Producer側では10件登録しますので。

        while (received.size() < 10) {
            ConsumerRecords<Integer, String> records = consumer.poll(1000L);

            records.forEach(received::add);
        }

これで、なんとか動かせましたよ、と。

まとめ

Apache KafkaのClient APIを使って、Quickstart相当っぽいサンプルを書いてみました。

とにかくドキュメントにあんまり情報がないのでやり方がわからず苦労しましたが、KafkaProducer/KafkaConsumerともにメソッド数も
それほど多くないので、あとは設定できる項目とかを見ながらなれていく感じなのでしょうかね。

とりあえず、とっかかりとしてはこんなところで。