ふと、RabbitMQに対して、JavaScript(Node.js)クライアントからアクセスしてみたくなりまして。
RabbitMQのチュートリアルにある、こちらの2つを自分でも動かしてみることにしました。
RabbitMQ - RabbitMQ tutorial - Work Queues
RabbitMQ - RabbitMQ tutorial - Publish/Subscribe
Work Queueでは2つのConsumerが交互にメッセージを受け取るパターン、Publish/Subscribeでは2つのConsumerが同じメッセージを受け取るパターンの
チュートリアルです。
環境
Node.jsのバージョンは、以下のとおり。
$ node -v v8.4.0 $ npm -v 5.3.0
RabbitMQへのアクセスは、amqp.nodeを使います。
インストール。
$ npm install --save amqplib
package.jsonでは、このように。
"dependencies": { "amqplib": "^0.5.1" }
RabbitMQについては、次の前提とします。
amqp.node
今回使用する、amqp.nodeはこちらです。
GitHub - squaremo/amqp.node: AMQP 0-9-1 library and client for Node.JS
APIリファレンスは、こちら。
amqplib | Channel API reference
RabbitMQのチュートリアルではCallback APIを使っているのですが、今回はPromiseベースでいきたいと思います。
チュートリアルとは異なり、requireが以下のようになります。
const amqp = require("amqplib");
チュートリアルでは、次のようになっています。
var amqp = require('amqplib/callback_api');
「callback_api」ですね。
Work queues
では、「Work queues」のチュートリアルを書いてみます。
Publisher
まずは、送信側から。
send-queue.js
const amqp = require("amqplib"); const args = process.argv.slice(2); const queueName = args[0]; const message = args[1]; const logger = (fun) => console.log(`[${new Date()}] ${fun.call(null)}`); const p = amqp.connect("amqp://kazuhira:password@172.17.0.2:5672") .then(connection => connection.createChannel() .then(channel => { channel.assertQueue(queueName, { durable: true }); channel.sendToQueue(queueName, Buffer.from(JSON.stringify({ message: message, time: new Date() }))); logger(() => `send message [${message}]`); return Promise.resolve(channel); }) .then(channel => Promise.resolve([channel, connection])) ); p.then(resources => setTimeout(() => { const channel = resources[0]; const connection = resources[1]; channel.close().then(() => connection.close()).then( () => logger(() => "sender, finish") ); }, 500) );
基本的にはチュートリアルに似たものですが、メッセージをJSONで送りつつ日時も入れているところがポイントでしょうか。
あと、durableもtrueにしています。
起動引数は、第1引数がQueueの名前、第2引数が送るメッセージです。
最後にConnectionを切断する際に、setTimeoutを使っていますが
p.then(resources => setTimeout(() => { const channel = resources[0]; const connection = resources[1]; channel.close().then(() => connection.close()).then( () => logger(() => "sender, finish") ); }, 500) );
これ、いきなり切断すると次のようにエラーが出力されるからです。
Unhandled rejection Error: Channel ended, no reply will be forthcoming
やっていることは、単にメッセージを送っているだけです。
Consumer
続いて、受信側。
receive-queue.js
const amqp = require("amqplib"); const args = process.argv.slice(2); const queueName = args[0]; const logger = (fun) => console.log(`[${new Date()}] ${fun.call(null)}`); const p = amqp.connect("amqp://kazuhira:password@172.17.0.2:5672") .then(connection => connection.createChannel() .then(channel => { channel.assertQueue(queueName, { durable: true }); channel.prefetch(1); logger(() => `[*] Waiting for messages in ${queueName}. To exit press CTRL+C`); channel.consume(queueName, message => { logger(() => `receive message [${message.content.toString()}]`); channel.ack(message); }, { noAck: false }); }));
複数Consumerを紐付けるので、Channel#prefetchは1としました。またメッセージを読んだことは自分で伝えるようにするため、明示的にackを呼び出すように
しています。
起動引数は、Queueの名前のみです。
動作確認
それでは、動かしてみましょう。
まずは受信側を2つ起動しておきます。
## Cunsumer 1 $ node receive-queue.js hello-queue [Wed Aug 30 2017 23:55:58 GMT+0900 (JST)] [*] Waiting for messages in hello-queue. To exit press CTRL+C ## Consumer 2 $ node receive-queue.js hello-queue [Wed Aug 30 2017 23:56:09 GMT+0900 (JST)] [*] Waiting for messages in hello-queue. To exit press CTRL+C
Comsumer側は、Ctrl+Cを押すまでは終了せず、待機状態になります。
では、送信側を起動。メッセージを4回送ってみます。
$ node send-queue.js hello-queue Hello [Wed Aug 30 2017 23:57:03 GMT+0900 (JST)] send message [Hello] [Wed Aug 30 2017 23:57:04 GMT+0900 (JST)] sender, finish $ node send-queue.js hello-queue Hello1 [Wed Aug 30 2017 23:57:05 GMT+0900 (JST)] send message [Hello1] [Wed Aug 30 2017 23:57:06 GMT+0900 (JST)] sender, finish $ node send-queue.js hello-queue Hello2 [Wed Aug 30 2017 23:57:08 GMT+0900 (JST)] send message [Hello2] [Wed Aug 30 2017 23:57:09 GMT+0900 (JST)] sender, finish $ node send-queue.js hello-queue Hello3 [Wed Aug 30 2017 23:57:10 GMT+0900 (JST)] send message [Hello3] [Wed Aug 30 2017 23:57:11 GMT+0900 (JST)] sender, finish
受信側。
## Consumer 1 [Wed Aug 30 2017 23:57:03 GMT+0900 (JST)] receive message [{"message":"Hello","time":"2017-08-30T14:57:03.505Z"}] [Wed Aug 30 2017 23:57:08 GMT+0900 (JST)] receive message [{"message":"Hello2","time":"2017-08-30T14:57:08.637Z"}] ## Consumer 2 [Wed Aug 30 2017 23:57:05 GMT+0900 (JST)] receive message [{"message":"Hello1","time":"2017-08-30T14:57:05.969Z"}] [Wed Aug 30 2017 23:57:10 GMT+0900 (JST)] receive message [{"message":"Hello3","time":"2017-08-30T14:57:10.717Z"}]
メッセージがひとつずつ、順番に受信していますね。2つのConsumer間で、同じメッセージは受信していません。
OKそうですね。
Publish/Subscribe
続いて、Publish/Subscribeの方へ。
送信側
では、こちらもまずは送信側から。
publish.js
const amqp = require("amqplib"); const args = process.argv.slice(2); const exchangeName = args[0]; const message = args[1]; const logger = (fun) => console.log(`[${new Date()}] ${fun.call(null)}`); const p = amqp.connect("amqp://kazuhira:password@172.17.0.2:5672") .then(connection => connection.createChannel() .then(channel => { channel.assertExchange(exchangeName, "fanout", { durable: true }); channel.publish(exchangeName, "", Buffer.from(JSON.stringify({ message: message, time: new Date() }))); logger(() => `send message [${message}]`); return Promise.resolve(channel); }) .then(channel => Promise.resolve([channel, connection])) ); p.then(resources => setTimeout(() => { const channel = resources[0]; const connection = resources[1]; channel.close().then(() => connection.close()).then( () => logger(() => "sender, finish") ); }, 500) );
起動引数は、Exchangeの名前とメッセージ。Exchangeを「fanout」で作成しているくらいで、あとはWork Queueの時とほとんど同じ構成です。
メッセージの送信はChannel#publishで、第2引数はルーティングキーですが、今回は空です。
受信側
受信側はこちら。
subscribe.js
const amqp = require("amqplib"); const args = process.argv.slice(2); const exchangeName = args[0]; const message = args[1]; const logger = (fun) => console.log(`[${new Date()}] ${fun.call(null)}`); const p = amqp.connect("amqp://kazuhira:password@172.17.0.2:5672") .then(connection => connection.createChannel() .then(channel => { channel.assertExchange(exchangeName, "fanout", { durable: true }); channel.assertQueue("", { exclusive: true }) .then(queue => { logger(() => `[*] Waiting for messages in ${queue.queue}. To exit press CTRL+C`); channel.bindQueue(queue.queue, exchangeName, ""); channel.consume(queue.queue, message => { logger(() => `receive message [${message.content.toString()}]`) channel.ack(message); }, { noAck: false}) }); }) );
受信側は、Exchange名が起動引数です。
Channel#asseertQueueでランダムな名前のQueueを作成し、バインドします。このQueueは、接続ごとに作成される一時的なQueueとなります。
Channel#consumeでのメッセージ受信は、Work Queueの時と同じです。
確認
では、確認してみましょう。
最初に、受信側を2つ起動します。
## Consumer 1 $ node subscribe.js hello-topic [Thu Aug 31 2017 00:11:32 GMT+0900 (JST)] [*] Waiting for messages in amq.gen-gaCz0qJb3lqcvHixGcwIoA. To exit press CTRL+C ## Consumer 2 $ node subscribe.js hello-topic [Thu Aug 31 2017 00:11:43 GMT+0900 (JST)] [*] Waiting for messages in amq.gen-pbW0MWXzM0fksFKLOA0KGg. To exit press CTRL+C
Queueの名前がランダムな感じですね。
では、メッセージを送ってみます。
$ node publish.js hello-topic Hello1 [Thu Aug 31 2017 00:12:17 GMT+0900 (JST)] send message [Hello1] [Thu Aug 31 2017 00:12:17 GMT+0900 (JST)] sender, finish $ node publish.js hello-topic Hello2 [Thu Aug 31 2017 00:12:19 GMT+0900 (JST)] send message [Hello2] [Thu Aug 31 2017 00:12:20 GMT+0900 (JST)] sender, finish $ node publish.js hello-topic Hello3 [Thu Aug 31 2017 00:12:22 GMT+0900 (JST)] send message [Hello3] [Thu Aug 31 2017 00:12:23 GMT+0900 (JST)] sender, finish $ node publish.js hello-topic Hello4 [Thu Aug 31 2017 00:12:25 GMT+0900 (JST)] send message [Hello4] [Thu Aug 31 2017 00:12:25 GMT+0900 (JST)] sender, finish
受信側。
## Consumer 1 [Thu Aug 31 2017 00:12:17 GMT+0900 (JST)] receive message [{"message":"Hello1","time":"2017-08-30T15:12:17.376Z"}] [Thu Aug 31 2017 00:12:19 GMT+0900 (JST)] receive message [{"message":"Hello2","time":"2017-08-30T15:12:19.988Z"}] [Thu Aug 31 2017 00:12:22 GMT+0900 (JST)] receive message [{"message":"Hello3","time":"2017-08-30T15:12:22.899Z"}] [Thu Aug 31 2017 00:12:25 GMT+0900 (JST)] receive message [{"message":"Hello4","time":"2017-08-30T15:12:25.154Z"}] ## Consumer 2 [Thu Aug 31 2017 00:12:17 GMT+0900 (JST)] receive message [{"message":"Hello1","time":"2017-08-30T15:12:17.376Z"}] [Thu Aug 31 2017 00:12:19 GMT+0900 (JST)] receive message [{"message":"Hello2","time":"2017-08-30T15:12:19.988Z"}] [Thu Aug 31 2017 00:12:22 GMT+0900 (JST)] receive message [{"message":"Hello3","time":"2017-08-30T15:12:22.899Z"}] [Thu Aug 31 2017 00:12:25 GMT+0900 (JST)] receive message [{"message":"Hello4","time":"2017-08-30T15:12:25.154Z"}]
2つのConsumerが、両方とも同じメッセージを受信しています。こちらもOKですね。
まとめ
RabbitMQのJavaScriptクライアントを、Node.jsに慣れるのも兼ねてちょっと試してみました。
JavaScriptに不慣れなのでだいぶてこずりましたが、まあ動かせてよかったですかな…。