これは、なにをしたくて書いたもの?
RabbitMQのチュートリアルをJavaScriptクライアント+TypeScriptでやっていこう、ということで。
今回は「Publish/Subscribe」を扱います。こちらですね。
RabbitMQ tutorial - Publish/Subscribe — RabbitMQ
Publish/Subscribe
「Publish/Subscribe」では、複数のコンシューマーにメッセージを配信します。
RabbitMQ tutorial - Publish/Subscribe — RabbitMQ
これは、Publish/Subscribeというパターンとして知られているものです。
このチュートリアルでは、単純なログシステムを構築します。2つの役割から構成され、ひとつはログメッセージの送信する
プロデューサー、もうひとつはメッセージを受信するコンシューマーです。
つまり、メッセージがコンシューマーにブロードキャスト的に送信されるわけです。
ここでのポイントは、プロデューサーは直接キューにメッセージを送信するのではなく、Exchangeを1度挟むということです。
Exchangeを作成する時に、タイプを指定することができます。
ch.assertExchange('logs', 'fanout', {durable: false})
Exchangeには、direct
、topic
、headers
、fanout
の4つのタイプから選択できます。
AMQP 0-9-1 Model Explained / Exchanges and Exchange Types
fanout
は、Exchangeにバインドされているすべてのキューにメッセージのコピーを配信するタイプです。
AMQP 0-9-1 Model Explained / Exchanges and Exchange Types / Fanout Exchange
これでメッセージのブロードキャストが行われることになります。
メッセージの送信時には、Channel#publish
の第1引数にExchangeの名前、第2引数にキューの名前を指定しますが、キューの名前は
空にしておきます。
channel.publish( 'logs' , '' , Buffer.from( 'Hello World!' ));
これは、特定のキューにメッセージを送信するわけではなく、Exchangeに送信したいことだけを宣言するものです。
ではキューはどうするかというと、コンシューマーに一時的なキューを作成してもらいます。
一時的なキューを作成するには、Channel#assertQueue
のキュー名の部分を空文字列にします。
channel.assertQueue('', { exclusive: true });
これで、amq.gen-JzTY20BRgKO-HjmUJj0wLg
のようなランダムな名前を持ったキューが作成されます。また、exclusive
をtrue
に
することで排他的なキューを表し、コンシューマーが接続を切断すると一時キューは削除されます。
一時キューを作成する他の方法としては、TTLや自動削除の機能を使うものもあるようです。
ここまででファンアウトExchangeと一時キューができたので、この2つを以下で関連付けます。
channel.bindQueue(queue_name, 'logs', '');
これをバインディングと呼ぶようです。
では、ここまでの内容をNode.js+TypeScriptで試してみます。
環境
今回の環境は、こちら。
$ sudo -u rabbitmq rabbitmqctl version 3.11.12
RabbitMQは、172.17.0.2で動作しているものとします。
ユーザーも作成しておきます。
$ sudo -u rabbitmq rabbitmqctl add_user kazuhira password $ sudo -u rabbitmq rabbitmqctl set_permissions -p / kazuhira '.*' '.*' '.*' $ sudo -u rabbitmq rabbitmqctl set_user_tags kazuhira monitoring
使用するNode.jsのバージョン。
$ node --version v18.15.0 $ npm --version 9.5.0
準備
では、まずはNode.jsプロジェクトを作成します。
$ npm init -y $ npm i -D typescript $ npm i -D @types/node@v18 $ npm i -D prettier $ npm i -D jest @types/jest $ npm i -D esbuild esbuild-jest
確認は、テストコードで行います。
RabbitMQに接続するためのamqplibとその型定義をインストール。
$ npm i amqplib $ npm i -D @types/amqplib
$ mkdir src test
依存関係は、こうなりました。
"devDependencies": { "@types/amqplib": "^0.10.1", "@types/jest": "^29.5.0", "@types/node": "^18.15.11", "esbuild": "^0.17.15", "esbuild-jest": "^0.5.0", "jest": "^29.5.0", "prettier": "^2.8.7", "typescript": "^5.0.3" }, "dependencies": { "amqplib": "^0.10.3" }
scripts
。
"scripts": { "build": "tsc --project .", "build:watch": "tsc --project . --watch", "typecheck": "tsc --project ./tsconfig.typecheck.json", "typecheck:watch": "tsc --project ./tsconfig.typecheck.json --watch", "test": "jest", "format": "prettier --write src test" },
各種設定ファイル。
tsconfig.json
{ "compilerOptions": { "target": "esnext", "module": "commonjs", "moduleResolution": "node", "lib": ["esnext"], "baseUrl": "./src", "outDir": "dist", "strict": true, "forceConsistentCasingInFileNames": true, "noFallthroughCasesInSwitch": true, "noImplicitOverride": true, "noImplicitReturns": true, "noPropertyAccessFromIndexSignature": true, "esModuleInterop": true }, "include": [ "src" ] }
tsconfig.typecheck.json
{ "extends": "./tsconfig", "compilerOptions": { "baseUrl": "./", "noEmit": true }, "include": [ "src", "test" ] }
.prettierrc.json
{ "singleQuote": true, "printWidth": 120 }
jest.config.js
module.exports = { testEnvironment: 'node', transform: { "^.+\\.tsx?$": "esbuild-jest" } };
RabbitMQのJavaScriptチュートリアルの「Publish/Subscribe」をTypeScriptとJestで試す
では、こちらをTypeScriptとJestで試していきます。
RabbitMQ tutorial - Publish/Subscribe — RabbitMQ
チュートリアル内ではプロデューサーとコンシューマーをそれぞれ別プロセスで起動していますが、今回はテストコード内で表現して
みます。
プロデューサーを作成する
まずはプロデューサーを作成します。今回はプロデューサーもコンシューマーもクラスとして作成します。
src/Producer.ts
import amqp from 'amqplib'; import { log } from './log'; export class Producer { private conn: amqp.Connection; private channel: amqp.Channel; private exchange: string; constructor(conn: amqp.Connection, channel: amqp.Channel, exchange: string) { this.conn = conn; this.channel = channel; this.exchange = exchange; } static async create(url: string, exchange: string): Promise<Producer> { const conn = await amqp.connect(url); const channel = await conn.createChannel(); await channel.assertExchange(exchange, 'fanout', { durable: false }); return new Producer(conn, channel, exchange); } send(message: string): void { this.channel.publish(this.exchange, '', Buffer.from(message)); log(` [x] Send '${message}'`); } async close(): Promise<void> { await this.conn.close(); } }
プロデューサーのポイントは、ここですね。Channel#assertExchange
でExchangeの存在確認を行い、存在していなければ作成します。
await channel.assertExchange(exchange, 'fanout', { durable: false });
Channel-oriented API reference / API reference / Channel / assertExchange
第1引数はExchangeの名前です。第2引数はExchangeのタイプで、ブロードキャストとなるfanout
を指定します。
また、durable
をfalse
にしているので、このExchangeはRabbitMQブローカーが再起動すると失われます。
メッセージをExchangeに送信しているのは、こちら。Channel#publish
を使います。
this.channel.publish(this.exchange, '', Buffer.from(message));
Channel-oriented API reference / API reference / Channel / publish
第1引数はExchangeの名前です。第2引数はルーティングキーで、キューへのルーティングを決めるものなのですが、ここで
空の文字列(''
)を指定することでルーティング先のキューを指定しないことになります。
ところで、log
という関数は以下の定義です。
src/log.ts
export function log(message: string): void { console.log(`[${new Date().toISOString()}] ${message}`); }
こちらはコンシューマーでも使います。
コンシューマーを作成する
次は、コンシューマーを作成します。
src/Consumer.ts
import amqp from 'amqplib'; import { log } from './log'; export class Consumer { private name: string; private conn: amqp.Connection; private channel: amqp.Channel; private exchange: string; private queue: string; private receivedMessages: string[] = []; constructor(name: string, conn: amqp.Connection, channel: amqp.Channel, exchange: string, queue: string) { this.name = name; this.conn = conn; this.channel = channel; this.exchange = exchange; this.queue = queue; } static async create(name: string, url: string, exchange: string): Promise<Consumer> { const conn = await amqp.connect(url); const channel = await conn.createChannel(); await channel.assertExchange(exchange, 'fanout', { durable: false }); const queue = await channel.assertQueue('', { exclusive: true }); return new Consumer(name, conn, channel, exchange, queue.queue); } async start(): Promise<void> { await this.channel.bindQueue(this.queue, this.exchange, ''); await this.channel.consume( this.queue, async (message) => { const messageAsString = message!.content.toString(); log(` [${this.name}] Received, ${messageAsString}`); this.receivedMessages.push(messageAsString); }, { noAck: true, } ); log(`waiting for message, for queue, ${this.queue}`); } getReceivedMessage(): string[] { return [...this.receivedMessages]; } async close(): Promise<void> { await this.conn.close(); } }
プロデューサーと同様にChannel#assertExchange
とした後に、Channel#assertQueue
を呼び出します。
await channel.assertExchange(exchange, 'fanout', { durable: false }); const queue = await channel.assertQueue('', { exclusive: true });
Channel-oriented API reference / API reference / Channel / assertQueue
ここで、キューの名前を空文字列(''
)で指定しているところがポイントで、こうするとamq.gen-JzTY20BRgKO-HjmUJj0wLg
ような
ランダムな名前でキューが作成されます。
またexclusive
をtrue
にしているので、これは排他的な一時キューとなり、接続がクローズされると削除されるものになります。
そして、この一時キューとExchangeを紐付けます。それがChannel#bindQueue
です。
await this.channel.bindQueue(this.queue, this.exchange, '');
Channel-oriented API reference / API reference / Channel / bindQueue
この時、一時キューの名前とExchangeの名前を指定します。最後の引数はパターンですが、ここは空文字列を指定します。
あとはメッセージの受信です。
await this.channel.consume( this.queue, async (message) => { const messageAsString = message!.content.toString(); log(` [${this.name}] Received, ${messageAsString}`); this.receivedMessages.push(messageAsString); }, { noAck: true, } );
今回はnoAck
をtrue
にして、メッセージを受信したらその時点でメッセージをブローカーから削除する動作にしています。
これで、プロデューサーとコンシューマーができました。
テストコードで確認する
では、テストコードで確認してみます。
test/publish-subscribe.test.ts
import { setTimeout } from 'timers/promises'; import { Consumer } from '../src/Consumer'; import { Producer } from '../src/Producer'; test('publish subscribe test', async () => { const url = 'amqp://kazuhira:password@172.17.0.2:5672'; const exchange = 'logs'; const producer = await Producer.create(url, exchange); const consumer1 = await Consumer.create('consumer1', url, exchange); const consumer2 = await Consumer.create('consumer2', url, exchange); try { await consumer1.start(); await consumer2.start(); producer.send('Hello World'); await setTimeout(2 * 1000); expect(consumer1.getReceivedMessage()).toEqual(['Hello World']); expect(consumer2.getReceivedMessage()).toEqual(['Hello World']); producer.send('Hello RabbitMQ'); await setTimeout(2 * 1000); expect(consumer1.getReceivedMessage()).toEqual(['Hello World', 'Hello RabbitMQ']); expect(consumer2.getReceivedMessage()).toEqual(['Hello World', 'Hello RabbitMQ']); } finally { await producer.close(); await consumer1.close(); await consumer2.close(); } });
Exchangeの名前はlogs
とします。
const exchange = 'logs';
プロデューサーをひとつ、コンシューマーを2つ作成します。
const producer = await Producer.create(url, exchange); const consumer1 = await Consumer.create('consumer1', url, exchange); const consumer2 = await Consumer.create('consumer2', url, exchange);
それぞれのコンシュマーをスタートしてから、プロデューサーからメッセージをExchangeに送信する度に、両方のコンシューマーで
メッセージを受信できることが確認できます。
await consumer1.start(); await consumer2.start(); producer.send('Hello World'); await setTimeout(2 * 1000); expect(consumer1.getReceivedMessage()).toEqual(['Hello World']); expect(consumer2.getReceivedMessage()).toEqual(['Hello World']); producer.send('Hello RabbitMQ'); await setTimeout(2 * 1000); expect(consumer1.getReceivedMessage()).toEqual(['Hello World', 'Hello RabbitMQ']); expect(consumer2.getReceivedMessage()).toEqual(['Hello World', 'Hello RabbitMQ']);
実行。
$ npm test
コンソール上のログは、こんな感じになります。
> publish-subscribe@1.0.0 test > jest console.log [2023-04-04T16:48:14.669Z] waiting for message, for queue, amq.gen-9ER3hYLMjeN56pYaUQq5SA at log (src/log.ts:24:11) console.log [2023-04-04T16:48:14.708Z] waiting for message, for queue, amq.gen-USWfLj95zKeTiOjBqJXHQQ at log (src/log.ts:24:11) console.log [2023-04-04T16:48:14.713Z] [x] Send 'Hello World' at log (src/log.ts:24:11) console.log [2023-04-04T16:48:14.718Z] [consumer2] Received, Hello World at log (src/log.ts:24:11) console.log [2023-04-04T16:48:14.731Z] [consumer1] Received, Hello World at log (src/log.ts:24:11) console.log [2023-04-04T16:48:16.724Z] [x] Send 'Hello RabbitMQ' at log (src/log.ts:24:11) console.log [2023-04-04T16:48:16.729Z] [consumer2] Received, Hello RabbitMQ at log (src/log.ts:24:11) console.log [2023-04-04T16:48:16.734Z] [consumer1] Received, Hello RabbitMQ at log (src/log.ts:24:11) PASS test/publish-subscribe.test.ts (6.323 s) ✓ publish subscribe test (4370 ms) Test Suites: 1 passed, 1 total Tests: 1 passed, 1 total Snapshots: 0 total Time: 6.98 s Ran all test suites.
OKですね。
また、テスト実行中に以下のコマンドを実行してみましょう。
$ sudo -u rabbitmq rabbitmqctl list_exchanges $ sudo -u rabbitmq rabbitmqctl list_queues $ sudo -u rabbitmq rabbitmqctl list_bindings
結果は、それぞれ以下です。
$ sudo -u rabbitmq rabbitmqctl list_exchanges Listing exchanges for vhost / ... name type amq.match headers amq.rabbitmq.trace topic logs fanout direct amq.fanout fanout amq.direct direct amq.topic topic amq.headers headers $ sudo -u rabbitmq rabbitmqctl list_queues Timeout: 60.0 seconds ... Listing queues for vhost / ... name messages amq.gen-T5dF5HyFmLcK3rwNG1O6Lg 0 amq.gen-BK-ZwFrhhJrragtXXE-fEg 0 $ sudo -u rabbitmq rabbitmqctl list_bindings Listing bindings for vhost /... source_name source_kind destination_name destination_kind routing_key arguments exchange amq.gen-T5dF5HyFmLcK3rwNG1O6Lg queue amq.gen-T5dF5HyFmLcK3rwNG1O6Lg [] exchange amq.gen-BK-ZwFrhhJrragtXXE-fEg queue amq.gen-BK-ZwFrhhJrragtXXE-fEg [] logs exchange amq.gen-BK-ZwFrhhJrragtXXE-fEg queue [] logs exchange amq.gen-T5dF5HyFmLcK3rwNG1O6Lg queue []
Exchangeとしてlogs
があり、
logs fanout direct
一時キューも確認できます。
amq.gen-T5dF5HyFmLcK3rwNG1O6Lg 0 amq.gen-BK-ZwFrhhJrragtXXE-fEg 0
Exchangeと一時キューの紐付けも確認できますね。
source_name source_kind destination_name destination_kind routing_key arguments exchange amq.gen-T5dF5HyFmLcK3rwNG1O6Lg queue amq.gen-T5dF5HyFmLcK3rwNG1O6Lg [] exchange amq.gen-BK-ZwFrhhJrragtXXE-fEg queue amq.gen-BK-ZwFrhhJrragtXXE-fEg [] logs exchange amq.gen-BK-ZwFrhhJrragtXXE-fEg queue [] logs exchange amq.gen-T5dF5HyFmLcK3rwNG1O6Lg queue []
これを見ると、空文字で指定したルーティングキーは、自動で生成される感じになっているのでしょうか…?
ひとまず、動作確認できたのでここまでですね。
まとめ
RabbitMQのJavaScriptチュートリアルの「Publish/Subscribe」を、TypeScriptで試してみました。
「Work Queues」の時にある程度やり方を確立したので、今回は割とあっさりできたかなと思います。