Apache Kafkaに対して、たまにはJava以外からもアクセスしてみようかなと、JavaScriptのクライアントを試してみることにしました。
今回使ったのは、こちら。
GitHub - SOHU-Co/kafka-node: Node.js client for Apache Kafka 0.8 and later.
Producer、Consuemrなどは、それなりに使える感じみたいです。機能的には、こういうところがアピールされています。
- Producer/HighLevelProducer、Consumer/HighLevelConsumer(今はConsumerGroup)が使える
- Node Stream Producer/Consumer(ConsumerGroupStream)が使える
- オフセットが管理できる
- Apache ZooKeeperなしで、直接Apache Kafka Brokerに接続できる
最初の方は、機能的な説明すぎてよくわからないのですが…。
ドキュメントはGitHubのREADMEか、npm側に載っているものそのままのどちらかを見ればいい感じです。
構成
Apache KafkaおよびApache ZooKeeperは、以下の構成とします。
ProducerおよびConsumerはひとつずつとし、kafka-nodeの
- Producer/Consumer
- HighLevelProducer/ConsumerGroup
を使ってそれぞれプログラムを書いて試してみたいと思います。
準備
環境。
$ node -v v8.2.1 $ npm -v 5.3.0
まずは、kafka-nodeをインストールします。
$ npm install --save kafka-node
コードの実行とテストには、AVAとChaiを使うことにしました。
$ npm install --save-dev ava chai
GitHub - avajs/ava: 🚀 Futuristic JavaScript test runner
package.jsonのscripts要素は、こうしておきました。
"scripts": { "test": "ava -v -u" },
Apache ZooKeeperおよびApache Kafkaは、起動済み/クラスタ構成済みとします。
Topicは、あらかじめ作成しておきましょう。2パターンのProducer/Consumerを使うので、Topicもそれぞれ別に作成することにします。
$ bin/kafka-topics.sh --create --zookeeper 172.21.0.2:2181 --replication-factor 2 --partitions 3 --topic my-topic1 $ bin/kafka-topics.sh --create --zookeeper 172.21.0.2:2181 --replication-factor 2 --partitions 3 --topic my-topic2
Partition数は3、Replication Factorは2です。
kafka-nodeのAPIを使ってTopicの作成もできるのですが、どうもオプションが少なさそうだったので、コマンドで作成としました。
では、こちらを使って書いていきます。
テストコードの雛形
テストコードの雛形は、このように準備。
test/getting-started.js
import test from "ava"; import chai from "chai"; import kafka from "kafka-node"; chai.should(); // ここに、テストを書く!! }
kafka-nodeに絡むところは、importくらいですが。
Producer/Consumer
では、まずはProducer/Consumerを使ってみます。
このあとでHighLevelProducerというのも使うのですが、その差はどうもPartitionの扱いのようです。メッセージをApache Kafkaに送る際に、
どのPartitionに送るのかを強く意識するのがこのProducerみたいです。
書いたコードは、こちら。
test.serial.cb("simple producer", t => { const Producer = kafka.Producer; const client = new kafka.KafkaClient({ kafkaHost: "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092" }); const producer = new Producer(client, { partitionerType: 1 }); producer.on("ready", () => { const payloads = [ { topic: "my-topic1", messages: JSON.stringify({name: "フグ田サザエ", age: 24}) }, { topic: "my-topic1", messages: [ JSON.stringify({name: "磯野カツオ", age: 11}), JSON.stringify({name: "磯野ワカメ", age: 9}) ] } ]; producer.send(payloads, (err, data) => { t.end(); }); }); });
まずは、KafkaClientとProducerを作成。KafkaClientは、最低限接続先があればよさげな。
const Producer = kafka.Producer; const client = new kafka.KafkaClient({ kafkaHost: "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092" }); const producer = new Producer(client, { partitionerType: 0 });
Producerの引数は、KafkaClientとオプション(任意)。partitionTypeで、Partitionへの振り分け方が決まるみたいです。
{ // Configuration for when to consider a message as acknowledged, default 1 requireAcks: 1, // The amount of time in milliseconds to wait for all acks before considered, default 100ms ackTimeoutMs: 100, // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0 partitionerType: 2 }
「default」はなんなんでしょう…。今回は、「default」を選択…すると、続いてのsendと合わせてメッセージが偏ることに…。
Producer#on "ready"になると、Producerが利用可能になるのでProducer#sendでメッセージを送信します。
producer.on("ready", () => { const payloads = [ { topic: "my-topic1", messages: JSON.stringify({name: "フグ田サザエ", age: 24}) }, { topic: "my-topic1", messages: [ JSON.stringify({name: "磯野カツオ", age: 11}), JSON.stringify({name: "磯野ワカメ", age: 9}) ] } ]; producer.send(payloads, (err, data) => { t.end(); }); });
Payloadとして設定するmessagesには、送るメッセージが単数の場合はそのまま、複数であれば配列で渡せばOKです。
const payloads = [ { topic: "my-topic1", messages: JSON.stringify({name: "フグ田サザエ", age: 24}) }, { topic: "my-topic1", messages: [ JSON.stringify({name: "磯野カツオ", age: 11}), JSON.stringify({name: "磯野ワカメ", age: 9}) ] } ];
今回は指定していませんが、Payloadの他のオプションとしてキーやPartitionのIDも指定することができます。
Partition IDは省略すると0で、今回はクラスタを組んだ割には全部0のPartitionにメッセージが行きました…。メッセージ登録先のPartitionを
固定しちゃうみたいですね。
また、メッセージ自体はStringに変換されてTopicに入るようで、今回はJSON#stringifyしていますが、しなかったら
[object Object]
とかがTopicに入って、悲しいことになりました。
で、今度はTopicから読み出してみましょう。Consumerです。
GitHub - SOHU-Co/kafka-node: Node.js client for Apache Kafka 0.8 and later.
できあがったコードは、こちら。
test.serial.cb("simple consumer", t => { const Consumer = kafka.Consumer; const client = new kafka.KafkaClient({ kafkaHost: "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092" }); const consumer = new Consumer( client, [{topic: "my-topic1", offset: 0}], { groupId: "simple-consumer1", autoCommit: true, fromOffset: true } ); const receivedMessages = new Set(); consumer.on("message", message => { receivedMessages.add(message.value); if (receivedMessages.size == 3) { const sortedValues = Array.from(receivedMessages.values()); sortedValues.sort((a, b) => JSON.parse(a).age < JSON.parse(b).age); const message1 = sortedValues[0]; const v1 = JSON.parse(message1); v1.name.should.equal("フグ田サザエ"); v1.age.should.equal(24); const message2 = sortedValues[1]; const v2 = JSON.parse(message2); v2.name.should.equal("磯野カツオ"); v2.age.should.equal(11); const message3 = sortedValues[2]; const v3 = JSON.parse(message3); v3.name.should.equal("磯野ワカメ"); v3.age.should.equal(9); t.end(); } }); });
こちらでは、KafkaClientとConsumerを使います。
const Consumer = kafka.Consumer; const client = new kafka.KafkaClient({ kafkaHost: "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092" }); const consumer = new Consumer( client, [{topic: "my-topic1", offset: 0}], { groupId: "simple-consumer1", autoCommit: true, fromOffset: "earliest" } );
Consumerでは、groupId(いわゆるConsumer Group)やコミットの設定などができます。
あとは、メッセージをConsumer#onで受信して確認…と。
const receivedMessages = new Set(); consumer.on("message", message => { receivedMessages.add(message.value); if (receivedMessages.size == 3) { const sortedValues = Array.from(receivedMessages.values()); sortedValues.sort((a, b) => JSON.parse(a).age < JSON.parse(b).age); const message1 = sortedValues[0]; const v1 = JSON.parse(message1); v1.name.should.equal("フグ田サザエ"); v1.age.should.equal(24); const message2 = sortedValues[1]; const v2 = JSON.parse(message2); v2.name.should.equal("磯野カツオ"); v2.age.should.equal(11); const message3 = sortedValues[2]; const v3 = JSON.parse(message3); v3.name.should.equal("磯野ワカメ"); v3.age.should.equal(9); t.end(); } });
なんですが、格納するPartitionがバラけると、うまく読んでくれない感じが…。
HighLevelProducer/ConsumerGroup
続いては、HighLevelProducerとConsumerGroupを使ってみます。
まずはHighLevelProducerから。
HighLevelProducer
書いたソースコードは、こちら。
test.serial.cb("high-level producer", t => { const HighLevelProducer = kafka.HighLevelProducer; const client = new kafka.KafkaClient({ kafkaHost: "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092" }); const producer = new HighLevelProducer(client); producer.on("ready", () => { const payloads = [ { topic: "my-topic2", messages: JSON.stringify({name: "フグ田サザエ", age: 24}) }, { topic: "my-topic2", messages: [ JSON.stringify({name: "磯野カツオ", age: 11}), JSON.stringify({name: "磯野ワカメ", age: 9}) ] } ]; producer.send(payloads, (err, data) => { t.end(); }); }); });
パッと見、Producerの時とほぼ変わりません。
const HighLevelProducer = kafka.HighLevelProducer; const client = new kafka.KafkaClient({ kafkaHost: "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092" }); const producer = new HighLevelProducer(client);
HighLevelProducerの引数として、KafkaClient以外にオプションも渡せるようですが、これはProducerの時と同じです。
READMEを読むと、Producer/HighLevelProducerそれぞれの引数であるKafkaClientに対しての説明が微妙に違います。
- Producer … client: client which keeps a connection with the Kafka server.
- HighLevelProducer … client: client which keeps a connection with the Kafka server. Round-robins produce requests to the available topic partitions
Partitionに対して、ラウンドロビンでリクエストを送る…と?
あとはほとんどProducerの時と変わりませんが、HighLevelProducer#sendsの時にPartition IDを指定できなくなっているみたいです。
HighLevelって、どういうことでしょう…。
続いて、ConsumerGroup。
ConsumerGroup
HighLevelConsumerというのもあるみたいですが、今はこちらを使うべきだそうな。
コードは、こちらもConsumerと似たような感じに。
test.serial.cb("consumer-group", t => { const ConsumerGroup = kafka.ConsumerGroup; const client = new kafka.KafkaClient({ kafkaHost: "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092" }); const consumerGroup = new ConsumerGroup( { host: "172.21.0.2:2181", kafkaHost: "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092", groupId: "consumer-group", protocol: ['roundrobin'], fromOffset: "earliest" }, ["my-topic2"] ); const receivedMessages = new Set(); consumerGroup.on("message", message => { receivedMessages.add(message.value); if (receivedMessages.size == 3) { const sortedValues = Array.from(receivedMessages.values()); sortedValues.sort((a, b) => JSON.parse(a).age < JSON.parse(b).age); const message1 = sortedValues[0]; const v1 = JSON.parse(message1); v1.name.should.equal("フグ田サザエ"); v1.age.should.equal(24); const message2 = sortedValues[1]; const v2 = JSON.parse(message2); v2.name.should.equal("磯野カツオ"); v2.age.should.equal(11); const message3 = sortedValues[2]; const v3 = JSON.parse(message3); v3.name.should.equal("磯野ワカメ"); v3.age.should.equal(9); t.end(); } }); });
オプションに指定する内容が、ちょっとConsumerとは違うみたいです。
コードの見た目はほとんど変わりませんが、複数Partitionにメッセージがバラけても、こちらだとさらっと読んでくれました…。
なんなんでしょう…。
まとめ
Apache KafkaのNode.jsクライアント、kafka-nodeを使ってJavaScriptからApache Kafkaにアクセスしてみました。
ライブラリに慣れていないのもさることながら、言語にも慣れていないのがだいぶネックに…。
Javaでやる方が楽なんですけど、どうなんでしょうねー。ドキュメントはもうちょっと欲しいかもしれません…(HighLevelとそうでない方の差が
とてもわからない…)。
あと、ProducerStreamとConsumerGroupStreamは外しました。
今回はここまでです。