Apache KafkaのClient APIを使った、いわゆる「Hello World」的なことを試してみよう…と思ったのですが、
ドキュメントを見てもそれほど書いてありません。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version> </dependency>
Configurationといったところが主な内容ですが。
Documentation / Configuration / Producer Configs
Documentation / Configuration / Consumer Configs
あとは、リポジトリ上に含まれている、examplesを見るのがよいでしょうか?
kafka/examples/src/main/java/kafka/examples at 0.10.2.0 · apache/kafka · GitHub
あんまり情報ないですけど、このあたりを参考にQuickstartで書かれていた
簡単なサンプル相当のものをClient APIを使って書いてみたいと思います。
Producer。
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
Consumer。
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message
準備
まずは、Maven依存関係から。先ほど記載したとおり、「kafka-clients」への依存関係を追加します。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.6.2</version> <scope>test</scope> </dependency>
テストコードでサンプルは書いてみようと思いますので、JUnitとAssertJも追加。
Topicもあらかじめ作成しておきます。今回使うTopicの名前は、「my-topic」としました。
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic
あと、こちらの実行上の都合ですが、別ホストで起動させたApache KafkaのBrokerにProducerおよびConsumerからの
接続をしたいと思いますので、config/server.propertiesでのlistenersには次のようにしておきます。
listeners=PLAINTEXT://[Apache Kafkaが稼働しているサーバーのIPアドレス]:9092
今回のサーバーのIPアドレスは、「172.17.0.2」とします。
listeners=PLAINTEXT://172.17.0.2:9092
では、この環境に対してApache KafkaのProducerとConsumerを書いていきましょう。
テストコードの雛形
テストコードの雛形は、以下とします。
src/test/java/org/littlewings/kafka/GettingStartedTest.java
package org.littlewings.kafka; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; public class GettingStartedTest { // ここに、テストコードを書く! }
このテストクラスの中に、ProducerとConsumerを使ったテストメソッドを書いていきます。
Producer
まずは、Producerから。
こんな感じに文字列を放り込むようにいってみましょう。
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
Producerを作るにあたり、参考にしたのは次の情報です。
https://github.com/apache/kafka/blob/0.10.2.0/examples/src/main/java/kafka/examples/Producer.java
また、Javadocを見るのもよいでしょう。
むしろ、これくらいしかない気もしますが…。
で、できあがったのがこちら。
@Test public void gettingStartedProducer() { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.17.0.2:9092"); try (KafkaProducer<Integer, String> producer = new KafkaProducer<>(properties, new IntegerSerializer(), new StringSerializer())) { IntStream .rangeClosed(1, 10) .forEach(i -> { try { producer.send(new ProducerRecord<>("my-topic", i, "value" + i)).get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }); } }
Propertiesでいろいろ設定ができるのですが、今回は接続先のサーバーのみ指定しました。定数値を使っていますが、内容は
「bootstrap.servers」です。
Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.17.0.2:9092");
全設定項目は、こちらを参照しましょう。
Documentation / Configuration / Producer Configs
Serializerは、Propertiesでも設定できるのですが、今回はKafkaProducerのコンストラクタの引数として
設定しました。
try (KafkaProducer<Integer, String> producer = new KafkaProducer<>(properties, new IntegerSerializer(), new StringSerializer())) {
Apache Kafkaに登録するレコードとして、KeyとValueのSerializerがそれぞれ必要になります。今回は、KeyをInteger、
ValueをStringとします。
で、あとはProducerRecordを作成して、KafkaProducer#sendで送り込みます。
IntStream .rangeClosed(1, 10) .forEach(i -> { try { producer.send(new ProducerRecord<>("my-topic", i, "value" + i)).get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } });
ProducerRecordのコンストラクタには、Topic名、Key、Valueをそれぞれ設定します。KafkaProducer#sendの戻り値はFutureなので、
今回はそれぞれFuture#getで待ち合わせるようにしました。
これで、Producer側はおしまいです。
Consumer
続いて、Consumer。こちらに似た内容を目指します。
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message
「--from-beginning」があるので、「Topicの最初から読む」ですね。
Comsumer側も参考にしたのは、examplesのコードと
https://github.com/apache/kafka/blob/0.10.2.0/examples/src/main/java/kafka/examples/Consumer.java
Javadocになります。
もちろん、設定についてのドキュメントも。
Documentation / Configuration / Consumer Configs
で、できあがったコードはこちら。
@Test public void gettingStartedConsumer() { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.17.0.2:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(properties, new IntegerDeserializer(), new StringDeserializer()); List<ConsumerRecord<Integer, String>> received = new ArrayList<>(); consumer.subscribe(Arrays.asList("my-topic")); while (received.size() < 10) { ConsumerRecords<Integer, String> records = consumer.poll(1000L); records.forEach(received::add); } assertThat(received) .hasSize(10); assertThat(received.stream().map(c -> c.value()).collect(Collectors.toList())) .containsExactly(IntStream.rangeClosed(1, 10).mapToObj(i -> "value" + i).toArray(String[]::new)); }
まずは、Producerと同様、Consumer側も設定としてPropertiesにいろいろ登録していきます。
Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.17.0.2:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer側は、少なくとも「bootstrap.servers」と「group.id」の指定が必要なようです。
また、「auto.offset.reset」についても「earliest」と指定しているのですが、こちらは「--from-beginning」と同じ動きを
させるために指定しています。指定しない場合のデフォルト値は「latest」となり、Consumerが起動した跡にTopicに登録
されたデータを読み出していくことになります。
レコードのKey/Valueは、IntegerとStringなのでProducerと同様に指定。
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(properties, new IntegerDeserializer(), new StringDeserializer());
KafkaConsumerで、subscribeするTopicの名前(複数可)を指定します。
consumer.subscribe(Arrays.asList("my-topic"));
あとは、KafkaConsumer#pollでメッセージの受信を待ちます。今回は、10件以上受信したらpollingをやめるようにしています。
というか、Producer側では10件登録しますので。
while (received.size() < 10) { ConsumerRecords<Integer, String> records = consumer.poll(1000L); records.forEach(received::add); }
これで、なんとか動かせましたよ、と。