CLOVER🍀

That was when it all began.

Apache Kafka(とApache ZooKeeper)をEmbeddedに使う

最近、Apache Kafkaで遊んでいますが、こう使っているとEmbedded(組み込み)で動かしてみたくなるもの。テストとかで
使ったりとか。

なんかできそうだという感じだったので、チャレンジしてみました。

こちらに書いてありましたので。

Unit testing / How do I write unit tests using Kafka?

…と思ったのですが、最終的にこちらの内容はあまり使わなくなりました…。

目標

Apache Kafka(とApache Kafkaが必要とするApache ZooKeeperも)を、Unit Testコード内でEmbedddedに動かします。
テストコード内で、Brokerをクラスタ化するところまでを目標にします。

準備

Maven依存関係は、こちら。

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>0.11.0.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
            <classifier>test</classifier>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>0.11.0.0</version>
            <classifier>test</classifier>
            <scope>test</scope>
        </dependency>

けっこういろいろ要ります。Classifierがtestのものが必要なことに注意です。

あとは、テストコード用にJUnitとAssertJを追加します。

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.8.0</version>
            <scope>test</scope>
        </dependency>

準備はこんなところです。

Wikiに載っているApache Curatorは、不要です。
Apache Curator –

Apache Kafka自身が組み込みでApache ZooKeeperを持っています。

もしもApache Kafkaと一緒に使う場合は、Apache Kafkaが依存しているApache ZooKeeperとメジャーバージョンは合わせておきましょう。
https://github.com/apache/curator/blob/apache-curator-2.12.0/pom.xml#L64

下手に最新版を持ってくると、失敗します…。

テストコードの雛形

テストコードの雛形は、こんな感じにします。
src/test/java/org/littlewings/kafka/EmbeddedKafkaTest.java

package org.littlewings.kafka;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class EmbeddedKafkaTest {
    // ここに、テストを書く!!
}

Apache ZooKeeperをEmbeddedに使う

まずは、Apache ZooKeeperがないと始まりません。Apache KafkaのWikiでは、Apache ZooKeeperをEmbeddedに使うために
Apache Curatorを使っていますが、Apache Kafka自身がEmbeddedにApache ZooKeeperを使う方法を提供しているので、
そちらを使用すればOKです。

https://github.com/apache/kafka/blob/0.11.0.0/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala

かなり簡単に作れます。

        EmbeddedZookeeper zkServer = new EmbeddedZookeeper();

        String zkConnectionString = "localhost:" + zkServer.port();

停止は、shutdownです。

            zkServer.shutdown();

ポートは、ランダムに割り当てられます(TestUtils#RandomPortというのは0です)。
https://github.com/apache/kafka/blob/0.11.0.0/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala#L34

データ、ログディレクトリは一時ディレクトリで、シャットダウン時に削除されます。
https://github.com/apache/kafka/blob/0.11.0.0/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala#L29-L30

Apache ZooKeeperはこんなところです。

Apache KafkaをEmbeddedに使う

こちらも、Wikiとけっこう違っていてだいぶソースコードを追いました。

参考にしたり、中身を追ったりしたのはこちら。
https://github.com/apache/kafka/blob/0.11.0.0/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
https://github.com/apache/kafka/blob/0.11.0.0/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
https://github.com/apache/kafka/blob/0.11.0.0/core/src/test/scala/unit/kafka/utils/TestUtils.scala
https://github.com/apache/kafka/blob/0.11.0.0/core/src/main/scala/kafka/server/KafkaConfig.scala
https://github.com/apache/kafka/blob/0.11.0.0/core/src/main/scala/kafka/utils/ZkUtils.scala

で、できあがったコードはこちら。
src/test/java/org/littlewings/kafka/EmbeddedKafkaServer.java

package org.littlewings.kafka;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.ServerSocket;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.common.utils.Time;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class EmbeddedKafkaServer {
    KafkaServer kafkaServer;
    int port;
    ZkClient zkClient;
    ZkUtils zkUtils;

    public EmbeddedKafkaServer(KafkaServer kafkaServer, int port, ZkClient zkClient, ZkUtils zkUtils) {
        this.kafkaServer = kafkaServer;
        this.port = port;
        this.zkClient = zkClient;
        this.zkUtils = zkUtils;
    }

    static int randomPort() {
        try (ServerSocket socket = new ServerSocket(0)) {
            return socket.getLocalPort();
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public static EmbeddedKafkaServer start(String zkConnectionString) {
        return start(1, zkConnectionString, randomPort());
    }

    public static EmbeddedKafkaServer start(int brokerId, String zkConnectionString) {
        return start(brokerId, zkConnectionString, randomPort());
    }

    public static EmbeddedKafkaServer start(int brokerId, String zkConnectionString, int port) {
        Properties properties =
                TestUtils
                        .createBrokerConfig(
                                brokerId,  // nodeId
                                zkConnectionString,  // zkConnect
                                true,  //     enableControlledShutdown
                                false,  // enableDeleteTopic
                                port, // port
                                Option.empty(), // interBrokerSecurityProtocol
                                Option.empty(),  // trustStoreFile
                                Option.empty(),  // saslProperties
                                true, // enablePlaintext
                                false,  // enableSaslPlaintext
                                0, // saslPlaintextPort
                                false,  // enableSsl
                                0,  // sslPort
                                false,  // enableSaslSsl,
                                0,  // saslSslPort
                                Option.empty()  // rack
                        );

        ZkClient zkClient = new ZkClient(zkConnectionString, 6000, 10000, ZKStringSerializer$.MODULE$);

        ZkUtils zkUtils =
                ZkUtils
                        .apply(
                                zkClient,  // zkClient
                                false  // isZkSecurityEnabled
                        );

        KafkaServer kafkaServer = TestUtils.createServer(new KafkaConfig(properties), Time.SYSTEM);

        return new EmbeddedKafkaServer(kafkaServer, port, zkClient, zkUtils);
    }

    public int getBrokerId() {
        return kafkaServer.config().brokerId();
    }

    public int getPort() {
        return port;
    }

    public String getBrokerConnectionString() {
        return String.format("localhost:%d", getPort());
    }

    public static void createTopic(String topic, int numPartitions, int replicationFactor, EmbeddedKafkaServer... servers) {
        createTopic(topic, numPartitions, replicationFactor, Arrays.asList(servers));
    }

    public static void createTopic(String topic, int numPartitions, int replicationFactor, List<EmbeddedKafkaServer> servers) {
        Seq<KafkaServer> kafkaServers =
                JavaConverters
                        .asScalaBuffer(
                                servers
                                        .stream()
                                        .map(s -> s.kafkaServer)
                                        .collect(Collectors.toList())
                        );

        ZkUtils zkUtils = servers.get(0).zkUtils;

        TestUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, kafkaServers, new Properties());
    }

    public void stop() {
        kafkaServer.shutdown();
        zkClient.close();
    }
}

ラップしたクラスを作っていますが、まずは起動から。

    static int randomPort() {
        try (ServerSocket socket = new ServerSocket(0)) {
            return socket.getLocalPort();
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public static EmbeddedKafkaServer start(String zkConnectionString) {
        return start(1, zkConnectionString, randomPort());
    }

    public static EmbeddedKafkaServer start(int brokerId, String zkConnectionString) {
        return start(brokerId, zkConnectionString, randomPort());
    }

    public static EmbeddedKafkaServer start(int brokerId, String zkConnectionString, int port) {
        Properties properties =
                TestUtils
                        .createBrokerConfig(
                                brokerId,  // nodeId
                                zkConnectionString,  // zkConnect
                                true,  //     enableControlledShutdown
                                false,  // enableDeleteTopic
                                port, // port
                                Option.empty(), // interBrokerSecurityProtocol
                                Option.empty(),  // trustStoreFile
                                Option.empty(),  // saslProperties
                                true, // enablePlaintext
                                false,  // enableSaslPlaintext
                                0, // saslPlaintextPort
                                false,  // enableSsl
                                0,  // sslPort
                                false,  // enableSaslSsl,
                                0,  // saslSslPort
                                Option.empty()  // rack
                        );

        ZkClient zkClient = new ZkClient(zkConnectionString, 6000, 10000, ZKStringSerializer$.MODULE$);

        ZkUtils zkUtils =
                ZkUtils
                        .apply(
                                zkClient,  // zkClient
                                false  // isZkSecurityEnabled
                        );

        KafkaServer kafkaServer = TestUtils.createServer(new KafkaConfig(properties), Time.SYSTEM);

        return new EmbeddedKafkaServer(kafkaServer, port, zkClient, zkUtils);
    }

デフォルトでランダムなポートを選ぶようにしていますが、これ、外から与えないと起動したApache Kafkaの
Brokerがどのポートを使用しているのかがわかりません

必要に応じてBrokerIdとポート、Apache ZooKeeperへの接続先を与えて、TestUtilsを使用してBrokerの設定を作成します。
パラメーターがとてもたくさんあるので、コメントに意味を書いています。指定している値は、ほとんどデフォルト値です。

        Properties properties =
                TestUtils
                        .createBrokerConfig(
                                brokerId,  // nodeId
                                zkConnectionString,  // zkConnect
                                true,  //     enableControlledShutdown
                                false,  // enableDeleteTopic
                                port, // port
                                Option.empty(), // interBrokerSecurityProtocol
                                Option.empty(),  // trustStoreFile
                                Option.empty(),  // saslProperties
                                true, // enablePlaintext
                                false,  // enableSaslPlaintext
                                0, // saslPlaintextPort
                                false,  // enableSsl
                                0,  // sslPort
                                false,  // enableSaslSsl,
                                0,  // saslSslPort
                                Option.empty()  // rack
                        );

このTestUtils#createBrokerConfig、インターフェースがバージョンで違うみたいなので、気をつけた方が…。

こちらの設定では、データの保存先はApache ZooKeeperの時と同じく一時ディレクトリとなります。
https://github.com/apache/kafka/blob/0.11.0.0/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L230

作成したPropertiesを使用して、KafkaServerのインスタンスを作成します。ここでも、TestUtilsクラスを
使用します。

        KafkaServer kafkaServer = TestUtils.createServer(new KafkaConfig(properties), Time.SYSTEM);

あと、Topicの作成で使用するので、ZkClientのインスタンス、ZkUtilsのインスタンスを作成しておきます。

        ZkClient zkClient = new ZkClient(zkConnectionString, 6000, 10000, ZKStringSerializer$.MODULE$);

        ZkUtils zkUtils =
                ZkUtils
                        .apply(
                                zkClient,  // zkClient
                                false  // isZkSecurityEnabled
                        );

Brokerのidは後からでも取れますが、ポートは後からはわからないので、先ほども書きましたがあらかじめ
覚えておきましょう。

    public int getBrokerId() {
        return kafkaServer.config().brokerId();
    }

    public int getPort() {
        return port;
    }

    public String getBrokerConnectionString() {
        return String.format("localhost:%d", getPort());
    }

Properties…というか、PortPropという項目もありますが、こちらはlistenerに変わっているので使ってはいけません。
https://github.com/apache/kafka/blob/0.11.0.0/core/src/main/scala/kafka/server/KafkaConfig.scala#L243
https://github.com/apache/kafka/blob/0.11.0.0/core/src/main/scala/kafka/server/KafkaConfig.scala#L428

これで、Apache Kafkaへの接続先が手に入ります。

停止は、KafkaServerに対してshutdownすればOKです。ここでは、一緒にZkClientもクローズしていますが。

    public void stop() {
        kafkaServer.shutdown();
        zkClient.close();
    }

あと、Topicの作り方ですが、ここでもTestUtilsを使用します。作成したKafkaServerのインスタンスの他に、
Topic名、Partition数、Replication数の指定が必要です。

    public static void createTopic(String topic, int numPartitions, int replicationFactor, EmbeddedKafkaServer... servers) {
        createTopic(topic, numPartitions, replicationFactor, Arrays.asList(servers));
    }

    public static void createTopic(String topic, int numPartitions, int replicationFactor, List<EmbeddedKafkaServer> servers) {
        Seq<KafkaServer> kafkaServers =
                JavaConverters
                        .asScalaBuffer(
                                servers
                                        .stream()
                                        .map(s -> s.kafkaServer)
                                        .collect(Collectors.toList())
                        );

        ZkUtils zkUtils = servers.get(0).zkUtils;

        TestUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, kafkaServers, new Properties());
    }

…ここまで、何度かそれっぽいコードが出てきていますが、Apache KafkaがScalaで書かれているので、Javaから
呼び出す際にはそのあたりを頑張る必要があります。

例えば、このあたり。

ScalaのOptionを使う。

        Properties properties =
                TestUtils
                        .createBrokerConfig(
                                brokerId,  // nodeId
                                zkConnectionString,  // zkConnect
                                true,  //     enableControlledShutdown
                                false,  // enableDeleteTopic
                                port, // port
                                Option.empty(), // interBrokerSecurityProtocol
                                Option.empty(),  // trustStoreFile
                                Option.empty(),  // saslProperties
                                true, // enablePlaintext
                                false,  // enableSaslPlaintext
                                0, // saslPlaintextPort
                                false,  // enableSsl
                                0,  // sslPort
                                false,  // enableSaslSsl,
                                0,  // saslSslPort
                                Option.empty()  // rack
                        );

Scalaのobjectを使う。

ZkClient zkClient = new ZkClient(zkConnectionString, 6000, 10000, ZKStringSerializer$.MODULE$);

引数にScalaのコレクションを要求される、など。

        Seq<KafkaServer> kafkaServers =
                JavaConverters
                        .asScalaBuffer(
                                servers
                                        .stream()
                                        .map(s -> s.kafkaServer)
                                        .collect(Collectors.toList())
                        );

        ZkUtils zkUtils = servers.get(0).zkUtils;

        TestUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, kafkaServers, new Properties());

ここまでで、準備完了です。

使ってみる

では、使ってみましょう。

先ほど用意したクラス、あとはApache Kafkaが提供しているEmbeddedZookeeperを使用してApache Kafka、Apache ZooKeeperを
それぞれセットアップすれば、ふつうに使えます。

    @Test
    public void embeddedZooKeeperAndKafkaServer() {
        EmbeddedZookeeper zkServer = new EmbeddedZookeeper();

        String zkConnectionString = "localhost:" + zkServer.port();

        EmbeddedKafkaServer kafkaServer = EmbeddedKafkaServer.start(zkConnectionString);

        try {
            EmbeddedKafkaServer.createTopic("test-topic", 1, 1, kafkaServer);

            String brokerConnectionString = kafkaServer.getBrokerConnectionString();

            Properties brokerProperties = new Properties();
            brokerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionString);
            brokerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            brokerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

            try (KafkaProducer<String, String> producer = new KafkaProducer<>(brokerProperties)) {
                producer.send(new ProducerRecord<>("test-topic", "key1", "value1"));
                producer.send(new ProducerRecord<>("test-topic", "key2", "value2"));
            }

            Properties consumerProperties = new Properties();
            consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionString);
            consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
            consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

            try (KafkaConsumer<String, String> consumer =
                         new KafkaConsumer<>(consumerProperties)) {
                consumer.subscribe(Arrays.asList("test-topic"));

                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(1000L);

                    if (records.isEmpty()) {
                        continue;
                    }

                    assertThat(records)
                            .hasSize(2);

                    Iterator<ConsumerRecord<String, String>> iterator = records.iterator();

                    ConsumerRecord<String, String> record1 = iterator.next();
                    assertThat(record1.key()).isEqualTo("key1");
                    assertThat(record1.value()).isEqualTo("value1");

                    ConsumerRecord<String, String> record2 = iterator.next();
                    assertThat(record2.key()).isEqualTo("key2");
                    assertThat(record2.value()).isEqualTo("value2");

                    break;
                }
            }
        } finally {
            kafkaServer.stop();
            zkServer.shutdown();
        }
    }

クラスタ化も可能です。

    @Test
    public void embeddedZooKeeperAndKafkaServerCluster() {
        EmbeddedZookeeper zkServer = new EmbeddedZookeeper();

        String zkConnectionString = "localhost:" + zkServer.port();

        List<EmbeddedKafkaServer> kafkaServers =
                IntStream
                        .rangeClosed(1, 3)
                        .mapToObj(i -> EmbeddedKafkaServer.start(i, zkConnectionString))
                        .collect(Collectors.toList());

        try {
            EmbeddedKafkaServer.createTopic("test-topic", 3, 2, kafkaServers);

            String brokerConnectionString =
                    kafkaServers
                            .stream()
                            .map(k -> k.getBrokerConnectionString())
                            .collect(Collectors.joining(","));

            Properties brokerProperties = new Properties();
            brokerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionString);
            brokerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            brokerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

            try (KafkaProducer<String, String> producer = new KafkaProducer<>(brokerProperties)) {
                producer.send(new ProducerRecord<>("test-topic", "key1", "value1"));
                producer.send(new ProducerRecord<>("test-topic", "key2", "value2"));
            }

            Properties consumerProperties = new Properties();
            consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionString);
            consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
            consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

            try (KafkaConsumer<String, String> consumer =
                         new KafkaConsumer<>(consumerProperties)) {
                consumer.subscribe(Arrays.asList("test-topic"));

                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(1000L);

                    if (records.isEmpty()) {
                        continue;
                    }

                    assertThat(records)
                            .hasSize(2);

                    Iterator<ConsumerRecord<String, String>> iterator = records.iterator();

                    ConsumerRecord<String, String> record1 = iterator.next();
                    assertThat(record1.key()).isEqualTo("key1");
                    assertThat(record1.value()).isEqualTo("value1");

                    ConsumerRecord<String, String> record2 = iterator.next();
                    assertThat(record2.key()).isEqualTo("key2");
                    assertThat(record2.value()).isEqualTo("value2");

                    break;
                }
            }
        } finally {
            kafkaServers.forEach(EmbeddedKafkaServer::stop);
            zkServer.shutdown();
        }
    }

ふつうに複数サーバー作ればOKです。

        List<EmbeddedKafkaServer> kafkaServers =
                IntStream
                        .rangeClosed(1, 3)
                        .mapToObj(i -> EmbeddedKafkaServer.start(i, zkConnectionString))
                        .collect(Collectors.toList());

Topicも、Partition数、Replication数も1以上を指定できるようになります。

            EmbeddedKafkaServer.createTopic("test-topic", 3, 2, kafkaServers);

これで動作させればOK、と。

まとめ

Apache KafkaとApache ZooKeeperを、Embeddedに動かしてみました。

読み解くのにちょっと苦労しましたが、落ち着いてテストコードとかTestUtilsとかを読み解いていけばなんとか
なりました。

あえて言えば、Wikiは最新化して欲しかったかも…。

とまあ、こんなの作ったんですが楽をしたかったらSpring Kafkaを使ったらいいのかもしれません。
https://github.com/spring-projects/spring-kafka/blob/v2.0.0.M3/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java

やってること、ほとんど同じです。これに気づいたの、だいぶあとでしたが(笑)。