最近、Apache Kafkaで遊んでいますが、こう使っているとEmbedded(組み込み)で動かしてみたくなるもの。テストとかで
使ったりとか。
なんかできそうだという感じだったので、チャレンジしてみました。
こちらに書いてありましたので。
Unit testing / How do I write unit tests using Kafka?
…と思ったのですが、最終的にこちらの内容はあまり使わなくなりました…。
目標
Apache Kafka(とApache Kafkaが必要とするApache ZooKeeperも)を、Unit Testコード内でEmbedddedに動かします。
テストコード内で、Brokerをクラスタ化するところまでを目標にします。
準備
Maven依存関係は、こちら。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>0.11.0.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> <classifier>test</classifier> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>0.11.0.0</version> <classifier>test</classifier> <scope>test</scope> </dependency>
けっこういろいろ要ります。Classifierがtestのものが必要なことに注意です。
あとは、テストコード用にJUnitとAssertJを追加します。
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.8.0</version> <scope>test</scope> </dependency>
準備はこんなところです。
Wikiに載っているApache Curatorは、不要です。
Apache Curator –
Apache Kafka自身が組み込みでApache ZooKeeperを持っています。
もしもApache Kafkaと一緒に使う場合は、Apache Kafkaが依存しているApache ZooKeeperとメジャーバージョンは合わせておきましょう。
https://github.com/apache/curator/blob/apache-curator-2.12.0/pom.xml#L64
下手に最新版を持ってくると、失敗します…。
テストコードの雛形
テストコードの雛形は、こんな感じにします。
src/test/java/org/littlewings/kafka/EmbeddedKafkaTest.java
package org.littlewings.kafka; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.IntStream; import kafka.zk.EmbeddedZookeeper; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; public class EmbeddedKafkaTest { // ここに、テストを書く!! }
Apache ZooKeeperをEmbeddedに使う
まずは、Apache ZooKeeperがないと始まりません。Apache KafkaのWikiでは、Apache ZooKeeperをEmbeddedに使うために
Apache Curatorを使っていますが、Apache Kafka自身がEmbeddedにApache ZooKeeperを使う方法を提供しているので、
そちらを使用すればOKです。
かなり簡単に作れます。
EmbeddedZookeeper zkServer = new EmbeddedZookeeper(); String zkConnectionString = "localhost:" + zkServer.port();
停止は、shutdownです。
zkServer.shutdown();
ポートは、ランダムに割り当てられます(TestUtils#RandomPortというのは0です)。
https://github.com/apache/kafka/blob/0.11.0.0/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala#L34
データ、ログディレクトリは一時ディレクトリで、シャットダウン時に削除されます。
https://github.com/apache/kafka/blob/0.11.0.0/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala#L29-L30
Apache ZooKeeperはこんなところです。
Apache KafkaをEmbeddedに使う
こちらも、Wikiとけっこう違っていてだいぶソースコードを追いました。
参考にしたり、中身を追ったりしたのはこちら。
https://github.com/apache/kafka/blob/0.11.0.0/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
https://github.com/apache/kafka/blob/0.11.0.0/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
https://github.com/apache/kafka/blob/0.11.0.0/core/src/test/scala/unit/kafka/utils/TestUtils.scala
https://github.com/apache/kafka/blob/0.11.0.0/core/src/main/scala/kafka/server/KafkaConfig.scala
https://github.com/apache/kafka/blob/0.11.0.0/core/src/main/scala/kafka/utils/ZkUtils.scala
で、できあがったコードはこちら。
src/test/java/org/littlewings/kafka/EmbeddedKafkaServer.java
package org.littlewings.kafka; import java.io.IOException; import java.io.UncheckedIOException; import java.net.ServerSocket; import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.stream.Collectors; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; import kafka.utils.TestUtils; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.apache.kafka.common.utils.Time; import scala.Option; import scala.collection.JavaConverters; import scala.collection.Seq; public class EmbeddedKafkaServer { KafkaServer kafkaServer; int port; ZkClient zkClient; ZkUtils zkUtils; public EmbeddedKafkaServer(KafkaServer kafkaServer, int port, ZkClient zkClient, ZkUtils zkUtils) { this.kafkaServer = kafkaServer; this.port = port; this.zkClient = zkClient; this.zkUtils = zkUtils; } static int randomPort() { try (ServerSocket socket = new ServerSocket(0)) { return socket.getLocalPort(); } catch (IOException e) { throw new UncheckedIOException(e); } } public static EmbeddedKafkaServer start(String zkConnectionString) { return start(1, zkConnectionString, randomPort()); } public static EmbeddedKafkaServer start(int brokerId, String zkConnectionString) { return start(brokerId, zkConnectionString, randomPort()); } public static EmbeddedKafkaServer start(int brokerId, String zkConnectionString, int port) { Properties properties = TestUtils .createBrokerConfig( brokerId, // nodeId zkConnectionString, // zkConnect true, // enableControlledShutdown false, // enableDeleteTopic port, // port Option.empty(), // interBrokerSecurityProtocol Option.empty(), // trustStoreFile Option.empty(), // saslProperties true, // enablePlaintext false, // enableSaslPlaintext 0, // saslPlaintextPort false, // enableSsl 0, // sslPort false, // enableSaslSsl, 0, // saslSslPort Option.empty() // rack ); ZkClient zkClient = new ZkClient(zkConnectionString, 6000, 10000, ZKStringSerializer$.MODULE$); ZkUtils zkUtils = ZkUtils .apply( zkClient, // zkClient false // isZkSecurityEnabled ); KafkaServer kafkaServer = TestUtils.createServer(new KafkaConfig(properties), Time.SYSTEM); return new EmbeddedKafkaServer(kafkaServer, port, zkClient, zkUtils); } public int getBrokerId() { return kafkaServer.config().brokerId(); } public int getPort() { return port; } public String getBrokerConnectionString() { return String.format("localhost:%d", getPort()); } public static void createTopic(String topic, int numPartitions, int replicationFactor, EmbeddedKafkaServer... servers) { createTopic(topic, numPartitions, replicationFactor, Arrays.asList(servers)); } public static void createTopic(String topic, int numPartitions, int replicationFactor, List<EmbeddedKafkaServer> servers) { Seq<KafkaServer> kafkaServers = JavaConverters .asScalaBuffer( servers .stream() .map(s -> s.kafkaServer) .collect(Collectors.toList()) ); ZkUtils zkUtils = servers.get(0).zkUtils; TestUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, kafkaServers, new Properties()); } public void stop() { kafkaServer.shutdown(); zkClient.close(); } }
ラップしたクラスを作っていますが、まずは起動から。
static int randomPort() { try (ServerSocket socket = new ServerSocket(0)) { return socket.getLocalPort(); } catch (IOException e) { throw new UncheckedIOException(e); } } public static EmbeddedKafkaServer start(String zkConnectionString) { return start(1, zkConnectionString, randomPort()); } public static EmbeddedKafkaServer start(int brokerId, String zkConnectionString) { return start(brokerId, zkConnectionString, randomPort()); } public static EmbeddedKafkaServer start(int brokerId, String zkConnectionString, int port) { Properties properties = TestUtils .createBrokerConfig( brokerId, // nodeId zkConnectionString, // zkConnect true, // enableControlledShutdown false, // enableDeleteTopic port, // port Option.empty(), // interBrokerSecurityProtocol Option.empty(), // trustStoreFile Option.empty(), // saslProperties true, // enablePlaintext false, // enableSaslPlaintext 0, // saslPlaintextPort false, // enableSsl 0, // sslPort false, // enableSaslSsl, 0, // saslSslPort Option.empty() // rack ); ZkClient zkClient = new ZkClient(zkConnectionString, 6000, 10000, ZKStringSerializer$.MODULE$); ZkUtils zkUtils = ZkUtils .apply( zkClient, // zkClient false // isZkSecurityEnabled ); KafkaServer kafkaServer = TestUtils.createServer(new KafkaConfig(properties), Time.SYSTEM); return new EmbeddedKafkaServer(kafkaServer, port, zkClient, zkUtils); }
デフォルトでランダムなポートを選ぶようにしていますが、これ、外から与えないと起動したApache Kafkaの
Brokerがどのポートを使用しているのかがわかりません。
必要に応じてBrokerIdとポート、Apache ZooKeeperへの接続先を与えて、TestUtilsを使用してBrokerの設定を作成します。
パラメーターがとてもたくさんあるので、コメントに意味を書いています。指定している値は、ほとんどデフォルト値です。
Properties properties = TestUtils .createBrokerConfig( brokerId, // nodeId zkConnectionString, // zkConnect true, // enableControlledShutdown false, // enableDeleteTopic port, // port Option.empty(), // interBrokerSecurityProtocol Option.empty(), // trustStoreFile Option.empty(), // saslProperties true, // enablePlaintext false, // enableSaslPlaintext 0, // saslPlaintextPort false, // enableSsl 0, // sslPort false, // enableSaslSsl, 0, // saslSslPort Option.empty() // rack );
このTestUtils#createBrokerConfig、インターフェースがバージョンで違うみたいなので、気をつけた方が…。
こちらの設定では、データの保存先はApache ZooKeeperの時と同じく一時ディレクトリとなります。
https://github.com/apache/kafka/blob/0.11.0.0/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L230
作成したPropertiesを使用して、KafkaServerのインスタンスを作成します。ここでも、TestUtilsクラスを
使用します。
KafkaServer kafkaServer = TestUtils.createServer(new KafkaConfig(properties), Time.SYSTEM);
あと、Topicの作成で使用するので、ZkClientのインスタンス、ZkUtilsのインスタンスを作成しておきます。
ZkClient zkClient = new ZkClient(zkConnectionString, 6000, 10000, ZKStringSerializer$.MODULE$); ZkUtils zkUtils = ZkUtils .apply( zkClient, // zkClient false // isZkSecurityEnabled );
Brokerのidは後からでも取れますが、ポートは後からはわからないので、先ほども書きましたがあらかじめ
覚えておきましょう。
public int getBrokerId() { return kafkaServer.config().brokerId(); } public int getPort() { return port; } public String getBrokerConnectionString() { return String.format("localhost:%d", getPort()); }
Properties…というか、PortPropという項目もありますが、こちらはlistenerに変わっているので使ってはいけません。
https://github.com/apache/kafka/blob/0.11.0.0/core/src/main/scala/kafka/server/KafkaConfig.scala#L243
https://github.com/apache/kafka/blob/0.11.0.0/core/src/main/scala/kafka/server/KafkaConfig.scala#L428
これで、Apache Kafkaへの接続先が手に入ります。
停止は、KafkaServerに対してshutdownすればOKです。ここでは、一緒にZkClientもクローズしていますが。
public void stop() { kafkaServer.shutdown(); zkClient.close(); }
あと、Topicの作り方ですが、ここでもTestUtilsを使用します。作成したKafkaServerのインスタンスの他に、
Topic名、Partition数、Replication数の指定が必要です。
public static void createTopic(String topic, int numPartitions, int replicationFactor, EmbeddedKafkaServer... servers) { createTopic(topic, numPartitions, replicationFactor, Arrays.asList(servers)); } public static void createTopic(String topic, int numPartitions, int replicationFactor, List<EmbeddedKafkaServer> servers) { Seq<KafkaServer> kafkaServers = JavaConverters .asScalaBuffer( servers .stream() .map(s -> s.kafkaServer) .collect(Collectors.toList()) ); ZkUtils zkUtils = servers.get(0).zkUtils; TestUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, kafkaServers, new Properties()); }
…ここまで、何度かそれっぽいコードが出てきていますが、Apache KafkaがScalaで書かれているので、Javaから
呼び出す際にはそのあたりを頑張る必要があります。
例えば、このあたり。
ScalaのOptionを使う。
Properties properties = TestUtils .createBrokerConfig( brokerId, // nodeId zkConnectionString, // zkConnect true, // enableControlledShutdown false, // enableDeleteTopic port, // port Option.empty(), // interBrokerSecurityProtocol Option.empty(), // trustStoreFile Option.empty(), // saslProperties true, // enablePlaintext false, // enableSaslPlaintext 0, // saslPlaintextPort false, // enableSsl 0, // sslPort false, // enableSaslSsl, 0, // saslSslPort Option.empty() // rack );
Scalaのobjectを使う。
ZkClient zkClient = new ZkClient(zkConnectionString, 6000, 10000, ZKStringSerializer$.MODULE$);
引数にScalaのコレクションを要求される、など。
Seq<KafkaServer> kafkaServers = JavaConverters .asScalaBuffer( servers .stream() .map(s -> s.kafkaServer) .collect(Collectors.toList()) ); ZkUtils zkUtils = servers.get(0).zkUtils; TestUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, kafkaServers, new Properties());
ここまでで、準備完了です。
使ってみる
では、使ってみましょう。
先ほど用意したクラス、あとはApache Kafkaが提供しているEmbeddedZookeeperを使用してApache Kafka、Apache ZooKeeperを
それぞれセットアップすれば、ふつうに使えます。
@Test public void embeddedZooKeeperAndKafkaServer() { EmbeddedZookeeper zkServer = new EmbeddedZookeeper(); String zkConnectionString = "localhost:" + zkServer.port(); EmbeddedKafkaServer kafkaServer = EmbeddedKafkaServer.start(zkConnectionString); try { EmbeddedKafkaServer.createTopic("test-topic", 1, 1, kafkaServer); String brokerConnectionString = kafkaServer.getBrokerConnectionString(); Properties brokerProperties = new Properties(); brokerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionString); brokerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); brokerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); try (KafkaProducer<String, String> producer = new KafkaProducer<>(brokerProperties)) { producer.send(new ProducerRecord<>("test-topic", "key1", "value1")); producer.send(new ProducerRecord<>("test-topic", "key2", "value2")); } Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionString); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) { consumer.subscribe(Arrays.asList("test-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(1000L); if (records.isEmpty()) { continue; } assertThat(records) .hasSize(2); Iterator<ConsumerRecord<String, String>> iterator = records.iterator(); ConsumerRecord<String, String> record1 = iterator.next(); assertThat(record1.key()).isEqualTo("key1"); assertThat(record1.value()).isEqualTo("value1"); ConsumerRecord<String, String> record2 = iterator.next(); assertThat(record2.key()).isEqualTo("key2"); assertThat(record2.value()).isEqualTo("value2"); break; } } } finally { kafkaServer.stop(); zkServer.shutdown(); } }
クラスタ化も可能です。
@Test public void embeddedZooKeeperAndKafkaServerCluster() { EmbeddedZookeeper zkServer = new EmbeddedZookeeper(); String zkConnectionString = "localhost:" + zkServer.port(); List<EmbeddedKafkaServer> kafkaServers = IntStream .rangeClosed(1, 3) .mapToObj(i -> EmbeddedKafkaServer.start(i, zkConnectionString)) .collect(Collectors.toList()); try { EmbeddedKafkaServer.createTopic("test-topic", 3, 2, kafkaServers); String brokerConnectionString = kafkaServers .stream() .map(k -> k.getBrokerConnectionString()) .collect(Collectors.joining(",")); Properties brokerProperties = new Properties(); brokerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionString); brokerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); brokerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); try (KafkaProducer<String, String> producer = new KafkaProducer<>(brokerProperties)) { producer.send(new ProducerRecord<>("test-topic", "key1", "value1")); producer.send(new ProducerRecord<>("test-topic", "key2", "value2")); } Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionString); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) { consumer.subscribe(Arrays.asList("test-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(1000L); if (records.isEmpty()) { continue; } assertThat(records) .hasSize(2); Iterator<ConsumerRecord<String, String>> iterator = records.iterator(); ConsumerRecord<String, String> record1 = iterator.next(); assertThat(record1.key()).isEqualTo("key1"); assertThat(record1.value()).isEqualTo("value1"); ConsumerRecord<String, String> record2 = iterator.next(); assertThat(record2.key()).isEqualTo("key2"); assertThat(record2.value()).isEqualTo("value2"); break; } } } finally { kafkaServers.forEach(EmbeddedKafkaServer::stop); zkServer.shutdown(); } }
ふつうに複数サーバー作ればOKです。
List<EmbeddedKafkaServer> kafkaServers = IntStream .rangeClosed(1, 3) .mapToObj(i -> EmbeddedKafkaServer.start(i, zkConnectionString)) .collect(Collectors.toList());
Topicも、Partition数、Replication数も1以上を指定できるようになります。
EmbeddedKafkaServer.createTopic("test-topic", 3, 2, kafkaServers);
これで動作させればOK、と。
まとめ
Apache KafkaとApache ZooKeeperを、Embeddedに動かしてみました。
読み解くのにちょっと苦労しましたが、落ち着いてテストコードとかTestUtilsとかを読み解いていけばなんとか
なりました。
あえて言えば、Wikiは最新化して欲しかったかも…。
とまあ、こんなの作ったんですが楽をしたかったらSpring Kafkaを使ったらいいのかもしれません。
https://github.com/spring-projects/spring-kafka/blob/v2.0.0.M3/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java
やってること、ほとんど同じです。これに気づいたの、だいぶあとでしたが(笑)。