CLOVER🍀

That was when it all began.

Apache Kafkaに、JavaScript(Node.js/kafka-node)からアクセスしてみる

Apache Kafkaに対して、たまにはJava以外からもアクセスしてみようかなと、JavaScriptのクライアントを試してみることにしました。

今回使ったのは、こちら。

kafka-node - npm

GitHub - SOHU-Co/kafka-node: Node.js client for Apache Kafka 0.8 and later.

Producer、Consuemrなどは、それなりに使える感じみたいです。機能的には、こういうところがアピールされています。

  • Producer/HighLevelProducer、Consumer/HighLevelConsumer(今はConsumerGroup)が使える
  • Node Stream Producer/Consumer(ConsumerGroupStream)が使える
  • オフセットが管理できる
  • Apache ZooKeeperなしで、直接Apache Kafka Brokerに接続できる

最初の方は、機能的な説明すぎてよくわからないのですが…。

ドキュメントはGitHubのREADMEか、npm側に載っているものそのままのどちらかを見ればいい感じです。

構成

Apache KafkaおよびApache ZooKeeperは、以下の構成とします。

  • Apache ZooKeeper … 1台(172.21.0.2)
  • Apache Kafka … 3台(172.21.0.3〜5)

ProducerおよびConsumerはひとつずつとし、kafka-nodeの

  • Producer/Consumer
  • HighLevelProducer/ConsumerGroup

を使ってそれぞれプログラムを書いて試してみたいと思います。

準備

環境。

$ node -v
v8.2.1
$ npm -v
5.3.0

まずは、kafka-nodeをインストールします。

$ npm install --save kafka-node

コードの実行とテストには、AVAとChaiを使うことにしました。

$ npm install --save-dev ava chai

GitHub - avajs/ava: 🚀 Futuristic JavaScript test runner

Chai

package.jsonのscripts要素は、こうしておきました。

  "scripts": {
    "test": "ava -v -u"
  },


Apache ZooKeeperおよびApache Kafkaは、起動済み/クラスタ構成済みとします。

Topicは、あらかじめ作成しておきましょう。2パターンのProducer/Consumerを使うので、Topicもそれぞれ別に作成することにします。

$ bin/kafka-topics.sh --create --zookeeper 172.21.0.2:2181 --replication-factor 2 --partitions 3 --topic my-topic1
$ bin/kafka-topics.sh --create --zookeeper 172.21.0.2:2181 --replication-factor 2 --partitions 3 --topic my-topic2

Partition数は3、Replication Factorは2です。

kafka-nodeのAPIを使ってTopicの作成もできるのですが、どうもオプションが少なさそうだったので、コマンドで作成としました。

では、こちらを使って書いていきます。

テストコードの雛形

テストコードの雛形は、このように準備。
test/getting-started.js

import test from "ava";
import chai from "chai";
import kafka from "kafka-node";

chai.should();

// ここに、テストを書く!!
}

kafka-nodeに絡むところは、importくらいですが。

Producer/Consumer

では、まずはProducer/Consumerを使ってみます。

Producer

このあとでHighLevelProducerというのも使うのですが、その差はどうもPartitionの扱いのようです。メッセージをApache Kafkaに送る際に、
どのPartitionに送るのかを強く意識するのがこのProducerみたいです。

書いたコードは、こちら。

test.serial.cb("simple producer", t => {
    const Producer = kafka.Producer;
    const client = new kafka.KafkaClient({
        kafkaHost: "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092"
    });
    const producer = new Producer(client, {
        partitionerType: 1
    });

    producer.on("ready", () => {
        const payloads = [
            {
                topic: "my-topic1",
                messages: JSON.stringify({name: "フグ田サザエ", age: 24})
            },
            {
                topic: "my-topic1",
                messages: [
                    JSON.stringify({name: "磯野カツオ", age: 11}),
                    JSON.stringify({name: "磯野ワカメ", age: 9})
                ]
            }
        ];

        producer.send(payloads, (err, data) => {
            t.end();
        });
    });
});

まずは、KafkaClientとProducerを作成。KafkaClientは、最低限接続先があればよさげな。

    const Producer = kafka.Producer;
    const client = new kafka.KafkaClient({
        kafkaHost: "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092"
    });
    const producer = new Producer(client, {
        partitionerType: 0
    });

Client

Producerの引数は、KafkaClientとオプション(任意)。partitionTypeで、Partitionへの振り分け方が決まるみたいです。

{
    // Configuration for when to consider a message as acknowledged, default 1
    requireAcks: 1,
    // The amount of time in milliseconds to wait for all acks before considered, default 100ms
    ackTimeoutMs: 100,
    // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
    partitionerType: 2
}

「default」はなんなんでしょう…。今回は、「default」を選択…すると、続いてのsendと合わせてメッセージが偏ることに…。

Producer#on "ready"になると、Producerが利用可能になるのでProducer#sendでメッセージを送信します。

    producer.on("ready", () => {
        const payloads = [
            {
                topic: "my-topic1",
                messages: JSON.stringify({name: "フグ田サザエ", age: 24})
            },
            {
                topic: "my-topic1",
                messages: [
                    JSON.stringify({name: "磯野カツオ", age: 11}),
                    JSON.stringify({name: "磯野ワカメ", age: 9})
                ]
            }
        ];

        producer.send(payloads, (err, data) => {
            t.end();
        });
    });

Payloadとして設定するmessagesには、送るメッセージが単数の場合はそのまま、複数であれば配列で渡せばOKです。

        const payloads = [
            {
                topic: "my-topic1",
                messages: JSON.stringify({name: "フグ田サザエ", age: 24})
            },
            {
                topic: "my-topic1",
                messages: [
                    JSON.stringify({name: "磯野カツオ", age: 11}),
                    JSON.stringify({name: "磯野ワカメ", age: 9})
                ]
            }
        ];

今回は指定していませんが、Payloadの他のオプションとしてキーやPartitionのIDも指定することができます。

Partition IDは省略すると0で、今回はクラスタを組んだ割には全部0のPartitionにメッセージが行きました…。メッセージ登録先のPartitionを
固定しちゃうみたいですね。

また、メッセージ自体はStringに変換されてTopicに入るようで、今回はJSON#stringifyしていますが、しなかったら

[object Object]

とかがTopicに入って、悲しいことになりました。

で、今度はTopicから読み出してみましょう。Consumerです。

GitHub - SOHU-Co/kafka-node: Node.js client for Apache Kafka 0.8 and later.

できあがったコードは、こちら。

test.serial.cb("simple consumer", t => {
    const Consumer = kafka.Consumer;
    const client = new kafka.KafkaClient({
        kafkaHost: "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092"
    });
    const consumer = new Consumer(
        client,
        [{topic: "my-topic1", offset: 0}],
        {
            groupId: "simple-consumer1",
            autoCommit: true,
            fromOffset: true
        }
    );

    const receivedMessages = new Set();

    consumer.on("message", message => {
        receivedMessages.add(message.value);

        if (receivedMessages.size == 3) {
            const sortedValues = Array.from(receivedMessages.values());
            sortedValues.sort((a, b) => JSON.parse(a).age < JSON.parse(b).age);

            const message1 = sortedValues[0];
            const v1 = JSON.parse(message1);
            v1.name.should.equal("フグ田サザエ");
            v1.age.should.equal(24);

            const message2 = sortedValues[1];
            const v2 = JSON.parse(message2);
            v2.name.should.equal("磯野カツオ");
            v2.age.should.equal(11);

            const message3 = sortedValues[2];
            const v3 = JSON.parse(message3);
            v3.name.should.equal("磯野ワカメ");
            v3.age.should.equal(9);

            t.end();
        }
    });
});

こちらでは、KafkaClientとConsumerを使います。

    const Consumer = kafka.Consumer;
    const client = new kafka.KafkaClient({
        kafkaHost: "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092"
    });
    const consumer = new Consumer(
        client,
        [{topic: "my-topic1", offset: 0}],
        {
            groupId: "simple-consumer1",
            autoCommit: true,
            fromOffset: "earliest"
        }
    );

Consumerでは、groupId(いわゆるConsumer Group)やコミットの設定などができます。

あとは、メッセージをConsumer#onで受信して確認…と。

    const receivedMessages = new Set();

    consumer.on("message", message => {
        receivedMessages.add(message.value);

        if (receivedMessages.size == 3) {
            const sortedValues = Array.from(receivedMessages.values());
            sortedValues.sort((a, b) => JSON.parse(a).age < JSON.parse(b).age);

            const message1 = sortedValues[0];
            const v1 = JSON.parse(message1);
            v1.name.should.equal("フグ田サザエ");
            v1.age.should.equal(24);

            const message2 = sortedValues[1];
            const v2 = JSON.parse(message2);
            v2.name.should.equal("磯野カツオ");
            v2.age.should.equal(11);

            const message3 = sortedValues[2];
            const v3 = JSON.parse(message3);
            v3.name.should.equal("磯野ワカメ");
            v3.age.should.equal(9);

            t.end();
        }
    });

なんですが、格納するPartitionがバラけると、うまく読んでくれない感じが…。

HighLevelProducer/ConsumerGroup

続いては、HighLevelProducerとConsumerGroupを使ってみます。

まずはHighLevelProducerから。
HighLevelProducer

書いたソースコードは、こちら。

test.serial.cb("high-level producer", t => {
    const HighLevelProducer = kafka.HighLevelProducer;
    const client = new kafka.KafkaClient({
        kafkaHost: "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092"
    });
    const producer = new HighLevelProducer(client);

    producer.on("ready", () => {
        const payloads = [
            {
                topic: "my-topic2",
                messages: JSON.stringify({name: "フグ田サザエ", age: 24})
            },
            {
                topic: "my-topic2",
                messages: [
                    JSON.stringify({name: "磯野カツオ", age: 11}),
                    JSON.stringify({name: "磯野ワカメ", age: 9})
                ]
            }
        ];

        producer.send(payloads, (err, data) => {
            t.end();
        });
    });
});

パッと見、Producerの時とほぼ変わりません。

    const HighLevelProducer = kafka.HighLevelProducer;
    const client = new kafka.KafkaClient({
        kafkaHost: "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092"
    });
    const producer = new HighLevelProducer(client);

HighLevelProducerの引数として、KafkaClient以外にオプションも渡せるようですが、これはProducerの時と同じです。

READMEを読むと、Producer/HighLevelProducerそれぞれの引数であるKafkaClientに対しての説明が微妙に違います。

  • Producer … client: client which keeps a connection with the Kafka server.
  • HighLevelProducer … client: client which keeps a connection with the Kafka server. Round-robins produce requests to the available topic partitions


Partitionに対して、ラウンドロビンでリクエストを送る…と?

あとはほとんどProducerの時と変わりませんが、HighLevelProducer#sendsの時にPartition IDを指定できなくなっているみたいです。

HighLevelって、どういうことでしょう…。

続いて、ConsumerGroup。
ConsumerGroup

HighLevelConsumerというのもあるみたいですが、今はこちらを使うべきだそうな。

コードは、こちらもConsumerと似たような感じに。

test.serial.cb("consumer-group", t => {
    const ConsumerGroup = kafka.ConsumerGroup;
    const client = new kafka.KafkaClient({
        kafkaHost: "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092"
    });
    const consumerGroup = new ConsumerGroup(
        {
            host: "172.21.0.2:2181",
            kafkaHost: "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092",
            groupId: "consumer-group",
            protocol: ['roundrobin'],
            fromOffset: "earliest"
        },
        ["my-topic2"]
    );

    const receivedMessages = new Set();

    consumerGroup.on("message", message => {
        receivedMessages.add(message.value);

        if (receivedMessages.size == 3) {
            const sortedValues = Array.from(receivedMessages.values());
            sortedValues.sort((a, b) => JSON.parse(a).age < JSON.parse(b).age);

            const message1 = sortedValues[0];
            const v1 = JSON.parse(message1);
            v1.name.should.equal("フグ田サザエ");
            v1.age.should.equal(24);

            const message2 = sortedValues[1];
            const v2 = JSON.parse(message2);
            v2.name.should.equal("磯野カツオ");
            v2.age.should.equal(11);

            const message3 = sortedValues[2];
            const v3 = JSON.parse(message3);
            v3.name.should.equal("磯野ワカメ");
            v3.age.should.equal(9);

            t.end();
        }
    });
});

オプションに指定する内容が、ちょっとConsumerとは違うみたいです。

コードの見た目はほとんど変わりませんが、複数Partitionにメッセージがバラけても、こちらだとさらっと読んでくれました…。
なんなんでしょう…。

まとめ

Apache KafkaのNode.jsクライアント、kafka-nodeを使ってJavaScriptからApache Kafkaにアクセスしてみました。

ライブラリに慣れていないのもさることながら、言語にも慣れていないのがだいぶネックに…。

Javaでやる方が楽なんですけど、どうなんでしょうねー。ドキュメントはもうちょっと欲しいかもしれません…(HighLevelとそうでない方の差が
とてもわからない…)。

あと、ProducerStreamとConsumerGroupStreamは外しました。

今回はここまでです。