CLOVER🍀

That was when it all began.

RabbitMQのJavaScriptクライアントで遊ぶ

ふと、RabbitMQに対して、JavaScript(Node.js)クライアントからアクセスしてみたくなりまして。

RabbitMQのチュートリアルにある、こちらの2つを自分でも動かしてみることにしました。

RabbitMQ - RabbitMQ tutorial - Work Queues

RabbitMQ - RabbitMQ tutorial - Publish/Subscribe

Work Queueでは2つのConsumerが交互にメッセージを受け取るパターン、Publish/Subscribeでは2つのConsumerが同じメッセージを受け取るパターンの
チュートリアルです。

環境

Node.jsのバージョンは、以下のとおり。

$ node -v
v8.4.0
$ npm -v
5.3.0

RabbitMQへのアクセスは、amqp.nodeを使います。

インストール。

$ npm install --save amqplib

package.jsonでは、このように。

  "dependencies": {
    "amqplib": "^0.5.1"
  }

RabbitMQについては、次の前提とします。

  • インストール済みかつ起動済み
  • バージョン: 3.6.11-1
  • IPアドレス: 172.17.0.2
  • アカウント; kazuhira/password

amqp.node

今回使用する、amqp.nodeはこちらです。
GitHub - squaremo/amqp.node: AMQP 0-9-1 library and client for Node.JS

APIリファレンスは、こちら。
amqplib | Channel API reference

RabbitMQのチュートリアルではCallback APIを使っているのですが、今回はPromiseベースでいきたいと思います。

チュートリアルとは異なり、requireが以下のようになります。

const amqp = require("amqplib");

チュートリアルでは、次のようになっています。

var amqp = require('amqplib/callback_api');

「callback_api」ですね。

Work queues

では、「Work queues」のチュートリアルを書いてみます。

Publisher

まずは、送信側から。

send-queue.js

const amqp = require("amqplib");

const args = process.argv.slice(2);
const queueName = args[0];
const message = args[1];

const logger = (fun) => console.log(`[${new Date()}] ${fun.call(null)}`);

const p =
          amqp.connect("amqp://kazuhira:password@172.17.0.2:5672")
          .then(connection =>
                connection.createChannel()
                .then(channel => {
                    channel.assertQueue(queueName, { durable: true });
                    channel.sendToQueue(queueName, Buffer.from(JSON.stringify({ message: message, time: new Date() })));

                    logger(() => `send message [${message}]`);

                    return Promise.resolve(channel);
                })
                .then(channel => Promise.resolve([channel, connection]))
               );

p.then(resources =>
    setTimeout(() => {
               const channel = resources[0];
               const connection = resources[1];
               channel.close().then(() => connection.close()).then(
                   () => logger(() => "sender, finish")
               );
              }, 500)
);

基本的にはチュートリアルに似たものですが、メッセージをJSONで送りつつ日時も入れているところがポイントでしょうか。
あと、durableもtrueにしています。

起動引数は、第1引数がQueueの名前、第2引数が送るメッセージです。

最後にConnectionを切断する際に、setTimeoutを使っていますが

p.then(resources =>
    setTimeout(() => {
               const channel = resources[0];
               const connection = resources[1];
               channel.close().then(() => connection.close()).then(
                   () => logger(() => "sender, finish")
               );
              }, 500)
);

これ、いきなり切断すると次のようにエラーが出力されるからです。

Unhandled rejection Error: Channel ended, no reply will be forthcoming

やっていることは、単にメッセージを送っているだけです。

Consumer

続いて、受信側。
receive-queue.js

const amqp = require("amqplib");

const args = process.argv.slice(2);
const queueName = args[0];

const logger = (fun) => console.log(`[${new Date()}] ${fun.call(null)}`);

const p =
          amqp.connect("amqp://kazuhira:password@172.17.0.2:5672")
          .then(connection =>
               connection.createChannel()
               .then(channel => {
                   channel.assertQueue(queueName, { durable: true });
                   channel.prefetch(1);

                   logger(() => `[*] Waiting for messages in ${queueName}. To exit press CTRL+C`);

                   channel.consume(queueName,
                                   message => {
                                       logger(() => `receive message [${message.content.toString()}]`);
                                       channel.ack(message);
                                   }, { noAck: false });
               }));

複数Consumerを紐付けるので、Channel#prefetchは1としました。またメッセージを読んだことは自分で伝えるようにするため、明示的にackを呼び出すように
しています。

起動引数は、Queueの名前のみです。

動作確認

それでは、動かしてみましょう。

まずは受信側を2つ起動しておきます。

## Cunsumer 1
$ node receive-queue.js hello-queue
[Wed Aug 30 2017 23:55:58 GMT+0900 (JST)] [*] Waiting for messages in hello-queue. To exit press CTRL+C

## Consumer 2
$ node receive-queue.js hello-queue
[Wed Aug 30 2017 23:56:09 GMT+0900 (JST)] [*] Waiting for messages in hello-queue. To exit press CTRL+C

Comsumer側は、Ctrl+Cを押すまでは終了せず、待機状態になります。

では、送信側を起動。メッセージを4回送ってみます。

$ node send-queue.js hello-queue Hello
[Wed Aug 30 2017 23:57:03 GMT+0900 (JST)] send message [Hello]
[Wed Aug 30 2017 23:57:04 GMT+0900 (JST)] sender, finish

$ node send-queue.js hello-queue Hello1
[Wed Aug 30 2017 23:57:05 GMT+0900 (JST)] send message [Hello1]
[Wed Aug 30 2017 23:57:06 GMT+0900 (JST)] sender, finish

$ node send-queue.js hello-queue Hello2
[Wed Aug 30 2017 23:57:08 GMT+0900 (JST)] send message [Hello2]
[Wed Aug 30 2017 23:57:09 GMT+0900 (JST)] sender, finish

$ node send-queue.js hello-queue Hello3
[Wed Aug 30 2017 23:57:10 GMT+0900 (JST)] send message [Hello3]
[Wed Aug 30 2017 23:57:11 GMT+0900 (JST)] sender, finish

受信側。

## Consumer 1
[Wed Aug 30 2017 23:57:03 GMT+0900 (JST)] receive message [{"message":"Hello","time":"2017-08-30T14:57:03.505Z"}]
[Wed Aug 30 2017 23:57:08 GMT+0900 (JST)] receive message [{"message":"Hello2","time":"2017-08-30T14:57:08.637Z"}]

## Consumer 2
[Wed Aug 30 2017 23:57:05 GMT+0900 (JST)] receive message [{"message":"Hello1","time":"2017-08-30T14:57:05.969Z"}]
[Wed Aug 30 2017 23:57:10 GMT+0900 (JST)] receive message [{"message":"Hello3","time":"2017-08-30T14:57:10.717Z"}]

メッセージがひとつずつ、順番に受信していますね。2つのConsumer間で、同じメッセージは受信していません。

OKそうですね。

Publish/Subscribe

続いて、Publish/Subscribeの方へ。

送信側

では、こちらもまずは送信側から。
publish.js

const amqp = require("amqplib");

const args = process.argv.slice(2);
const exchangeName = args[0];
const message = args[1];

const logger = (fun) => console.log(`[${new Date()}] ${fun.call(null)}`);

const p =
          amqp.connect("amqp://kazuhira:password@172.17.0.2:5672")
          .then(connection =>
                connection.createChannel()
                .then(channel => {
                    channel.assertExchange(exchangeName, "fanout", { durable: true });
                    channel.publish(exchangeName, "", Buffer.from(JSON.stringify({ message: message, time: new Date() })));

                    logger(() => `send message [${message}]`);

                    return Promise.resolve(channel);
                })
                .then(channel => Promise.resolve([channel, connection]))
               );

p.then(resources => 
       setTimeout(() => {
           const channel = resources[0];
           const connection = resources[1];
           channel.close().then(() => connection.close()).then(
               () => logger(() => "sender, finish")
           );
       }, 500)
);

起動引数は、Exchangeの名前とメッセージ。Exchangeを「fanout」で作成しているくらいで、あとはWork Queueの時とほとんど同じ構成です。
メッセージの送信はChannel#publishで、第2引数はルーティングキーですが、今回は空です。

受信側

受信側はこちら。
subscribe.js

const amqp = require("amqplib");

const args = process.argv.slice(2);
const exchangeName = args[0];
const message = args[1];

const logger = (fun) => console.log(`[${new Date()}] ${fun.call(null)}`);

const p =
          amqp.connect("amqp://kazuhira:password@172.17.0.2:5672")
          .then(connection =>
                connection.createChannel()
                .then(channel => {
                    channel.assertExchange(exchangeName, "fanout", { durable: true });
                    channel.assertQueue("", { exclusive: true })
                    .then(queue => {
                        logger(() => `[*] Waiting for messages in ${queue.queue}. To exit press CTRL+C`);
                        channel.bindQueue(queue.queue, exchangeName, "");

                        channel.consume(queue.queue,
                                        message => {
                                            logger(() => `receive message [${message.content.toString()}]`)
                                            channel.ack(message);
                                        }, { noAck: false})
                    });
                })
               );

受信側は、Exchange名が起動引数です。

Channel#asseertQueueでランダムな名前のQueueを作成し、バインドします。このQueueは、接続ごとに作成される一時的なQueueとなります。
Channel#consumeでのメッセージ受信は、Work Queueの時と同じです。

確認

では、確認してみましょう。

最初に、受信側を2つ起動します。

## Consumer 1
$ node subscribe.js hello-topic
[Thu Aug 31 2017 00:11:32 GMT+0900 (JST)] [*] Waiting for messages in amq.gen-gaCz0qJb3lqcvHixGcwIoA. To exit press CTRL+C

## Consumer 2
$ node subscribe.js hello-topic
[Thu Aug 31 2017 00:11:43 GMT+0900 (JST)] [*] Waiting for messages in amq.gen-pbW0MWXzM0fksFKLOA0KGg. To exit press CTRL+C

Queueの名前がランダムな感じですね。

では、メッセージを送ってみます。

$ node publish.js hello-topic Hello1
[Thu Aug 31 2017 00:12:17 GMT+0900 (JST)] send message [Hello1]
[Thu Aug 31 2017 00:12:17 GMT+0900 (JST)] sender, finish

$ node publish.js hello-topic Hello2
[Thu Aug 31 2017 00:12:19 GMT+0900 (JST)] send message [Hello2]
[Thu Aug 31 2017 00:12:20 GMT+0900 (JST)] sender, finish

$ node publish.js hello-topic Hello3
[Thu Aug 31 2017 00:12:22 GMT+0900 (JST)] send message [Hello3]
[Thu Aug 31 2017 00:12:23 GMT+0900 (JST)] sender, finish

$ node publish.js hello-topic Hello4
[Thu Aug 31 2017 00:12:25 GMT+0900 (JST)] send message [Hello4]
[Thu Aug 31 2017 00:12:25 GMT+0900 (JST)] sender, finish

受信側。

## Consumer 1
[Thu Aug 31 2017 00:12:17 GMT+0900 (JST)] receive message [{"message":"Hello1","time":"2017-08-30T15:12:17.376Z"}]
[Thu Aug 31 2017 00:12:19 GMT+0900 (JST)] receive message [{"message":"Hello2","time":"2017-08-30T15:12:19.988Z"}]
[Thu Aug 31 2017 00:12:22 GMT+0900 (JST)] receive message [{"message":"Hello3","time":"2017-08-30T15:12:22.899Z"}]
[Thu Aug 31 2017 00:12:25 GMT+0900 (JST)] receive message [{"message":"Hello4","time":"2017-08-30T15:12:25.154Z"}]

## Consumer 2
[Thu Aug 31 2017 00:12:17 GMT+0900 (JST)] receive message [{"message":"Hello1","time":"2017-08-30T15:12:17.376Z"}]
[Thu Aug 31 2017 00:12:19 GMT+0900 (JST)] receive message [{"message":"Hello2","time":"2017-08-30T15:12:19.988Z"}]
[Thu Aug 31 2017 00:12:22 GMT+0900 (JST)] receive message [{"message":"Hello3","time":"2017-08-30T15:12:22.899Z"}]
[Thu Aug 31 2017 00:12:25 GMT+0900 (JST)] receive message [{"message":"Hello4","time":"2017-08-30T15:12:25.154Z"}]

2つのConsumerが、両方とも同じメッセージを受信しています。こちらもOKですね。

まとめ

RabbitMQのJavaScriptクライアントを、Node.jsに慣れるのも兼ねてちょっと試してみました。

JavaScriptに不慣れなのでだいぶてこずりましたが、まあ動かせてよかったですかな…。