これは、なにをしたくて書いたもの?
RabbitMQのチュートリアルをJavaScriptクライアント+TypeScriptでやっていこう、ということで。
今回は「Routing」を扱います。こちらですね。
RabbitMQ tutorial - Routing — RabbitMQ
Routing
「Routing」では、メッセージをあるルールに従って特定のコンシューマーに配信します。
RabbitMQ tutorial - Routing — RabbitMQ
「Publish/Subscribe」では、以下のようにExchangeを作った後にキューをバインディング(関連付け)しました。
channel.bindQueue(q.queue, exchange, '');
これは、「キューが特定のExchangeに関心がある」ということを指しています。
Channel-oriented API reference / API reference / Channel / bindQueue
バインディングを行う時に、追加のキーを指定できます。これをバインディングキーと呼びます。
channel.bindQueue(queue_name, exchange_name, 'black');
バインディングキーは、Exchangeの種類によって意味が変わります。たとえばFanout exchangeだと、この値を無視します。
Exchangeの種類は、こちらですね。
- Direct exchange
- Fanout exchange
- Topic exchange
- Headers exchange
AMQP 0-9-1 Model Explained / Exchanges and Exchange Types
Fanout exchangeは、すべてのメッセージをすべてのコンシューマーにブロードキャストするものでした。
Direct exchange
Direct exchangeを使った場合、ルーティングキーに従ってメッセージを配信するキューを振り分けることができます。
AMQP 0-9-1 Model Explained / Exchanges and Exchange Types / Direct Exchange
ひとつのExchangeに、複数のキューを紐付けることもできます。
また、同じルーティングキーで複数のキューをバインドすることもできます。
では、ここまでの内容をNode.js+TypeScriptで試してみます。
環境
今回の環境は、こちら。
$ sudo -u rabbitmq rabbitmqctl version 3.12.4
RabbitMQは、172.17.0.2で動作しているものとします。
ユーザーも作成しておきます。
$ sudo -u rabbitmq rabbitmqctl add_user kazuhira password $ sudo -u rabbitmq rabbitmqctl set_permissions -p / kazuhira '.*' '.*' '.*' $ sudo -u rabbitmq rabbitmqctl set_user_tags kazuhira monitoring
使用するNode.jsのバージョン。
$ node --version v18.17.1 $ npm --version 9.6.7
準備
では、まずはNode.jsプロジェクトを作成します。
$ npm init -y $ npm i -D typescript $ npm i -D @types/node@v18 $ npm i -D prettier $ npm i -D jest @types/jest $ npm i -D esbuild esbuild-jest
確認は、テストコードで行います。
RabbitMQに接続するためのamqplibとその型定義をインストール。
$ npm i amqplib $ npm i -D @types/amqplib
$ mkdir src test
依存関係は、こうなりました。
"devDependencies": { "@types/amqplib": "^0.10.1", "@types/jest": "^29.5.5", "@types/node": "^18.17.18", "esbuild": "^0.19.3", "esbuild-jest": "^0.5.0", "jest": "^29.7.0", "prettier": "^3.0.3", "typescript": "^5.2.2" }, "dependencies": { "amqplib": "^0.10.3" }
scripts
。
"scripts": { "build": "tsc --project .", "build:watch": "tsc --project . --watch", "typecheck": "tsc --project ./tsconfig.typecheck.json", "typecheck:watch": "tsc --project ./tsconfig.typecheck.json --watch", "test": "jest", "format": "prettier --write src test" },
各種設定ファイル。
tsconfig.json
{ "compilerOptions": { "target": "esnext", "module": "commonjs", "moduleResolution": "node", "lib": ["esnext"], "baseUrl": "./src", "outDir": "dist", "strict": true, "forceConsistentCasingInFileNames": true, "noFallthroughCasesInSwitch": true, "noImplicitOverride": true, "noImplicitReturns": true, "noPropertyAccessFromIndexSignature": true, "esModuleInterop": true }, "include": [ "src" ] }
tsconfig.typecheck.json
{ "extends": "./tsconfig", "compilerOptions": { "baseUrl": "./", "noEmit": true }, "include": [ "src", "test" ] }
.prettierrc.json
{ "singleQuote": true, "printWidth": 120 }
jest.config.js
module.exports = { testEnvironment: 'node', transform: { "^.+\\.tsx?$": "esbuild-jest" } };
RabbitMQのJavaScriptチュートリアルの「Routing」をTypeScriptとJestで試す
では、こちらをTypeScriptとJestで試していきます。
RabbitMQ tutorial - Routing — RabbitMQ
チュートリアル内ではプロデューサーとコンシューマーをそれぞれ別プロセスで起動していますが、今回はテストコード内で表現します。
プロデューサーを作成する
まずはプロデューサーから。クラスとして作成します。
src/Producer.ts
import amqp from 'amqplib'; import { log } from './log'; export class Producer { private conn: amqp.Connection; private channel: amqp.Channel; private exchange: string; constructor(conn: amqp.Connection, channel: amqp.Channel, exchange: string) { this.conn = conn; this.channel = channel; this.exchange = exchange; } static async create(url: string, exchange: string): Promise<Producer> { const conn = await amqp.connect(url); const channel = await conn.createChannel(); await channel.assertExchange(exchange, 'direct', { durable: false, }); return new Producer(conn, channel, exchange); } send(routingKey: string, message: string): void { this.channel.publish(this.exchange, routingKey, Buffer.from(message)); log(` [x] Sent ${routingKey}: '${message}'`); } async close(): Promise<void> { await this.conn.close(); } }
Direct Exchangeを作成。
await channel.assertExchange(exchange, 'direct', { durable: false, });
Channel-oriented API reference / API reference / Channel / assertExchange
メッセージをExchangeに送信する箇所は、こちら。
send(routingKey: string, message: string): void { this.channel.publish(this.exchange, routingKey, Buffer.from(message)); log(` [x] Sent ${routingKey}: '${message}'`); }
Channel#publish
の第2引数にルーティングキーを指定します。
Channel-oriented API reference / API reference / Channel / publish
ブロードキャストしている時は、このルーティングキーに空の文字列を指定していました。
ちなみに、log
というのはこういう定義です。
src/log.ts
export function log(message: string): void { console.log(`[${new Date().toISOString()}] ${message}`); }
こちらはコンシューマーでも使います。
コンシューマーを作成する
次は、コンシューマーを作成します。
src/Consumer.ts
import amqp from 'amqplib'; import { log } from './log'; export class Consumer { private name: string; private conn: amqp.Connection; private channel: amqp.Channel; private exchange: string; private queue: string; private routingKeys: string[]; private receivedMessages: string[] = []; constructor( name: string, conn: amqp.Connection, channel: amqp.Channel, exchange: string, queue: string, routingKeys: string[], ) { this.name = name; this.conn = conn; this.channel = channel; this.exchange = exchange; this.queue = queue; this.routingKeys = routingKeys; } static async create(name: string, url: string, exchange: string, routingKeys: string[]): Promise<Consumer> { const conn = await amqp.connect(url); const channel = await conn.createChannel(); await channel.assertExchange(exchange, 'direct', { durable: false }); const queue = await channel.assertQueue('', { exclusive: true }); return new Consumer(name, conn, channel, exchange, queue.queue, routingKeys); } async start(): Promise<void> { for (const routingKey of this.routingKeys) { await this.channel.bindQueue(this.queue, this.exchange, routingKey); } await this.channel.consume(this.queue, async (message) => { const messageAsString = message!.content.toString(); log(` [${this.name} - ${message?.fields.routingKey}] Received, ${messageAsString}`); this.receivedMessages.push(messageAsString); }, { noAck: true }); log(` [${this.name} - [${this.routingKeys.join(', ')}]] waiting for message, for queue[${this.queue}]`); } getReceivedMessages(): string[] { return [...this.receivedMessages]; } async close(): Promise<void> { await this.conn.close(); } }
Channel#assertExchange
まではプロデューサーと同じで、その後にChannel#assertQueue
を呼び出します。
await channel.assertExchange(exchange, 'direct', { durable: false }); const queue = await channel.assertQueue('', { exclusive: true });
Channel-oriented API reference / API reference / Channel / assertQueue
ここで、キューの名前を空文字列(''
)で指定しているところがポイントで、こうするとamq.gen-JzTY20BRgKO-HjmUJj0wLg
のような
ランダムな名前でキューが作成されます。
またexclusive
をtrue
にしているので、これは排他的な一時キューとなり、接続がクローズされると削除されるものになります。
そして、キューにバインドします。
for (const routingKey of this.routingKeys) { await this.channel.bindQueue(this.queue, this.exchange, routingKey); }
この時、第3引数にルーティングキーを渡すことで、プロデューサーが指定したルーティングキーと一致する場合にこのコンシューマーは
キューからメッセージを受信できます。
なお、今回はひとつのコンシューマーで複数のルーティングキーをバインドできるようにしています。
メッセージの受信。
await this.channel.consume(this.queue, async (message) => { const messageAsString = message!.content.toString(); log(` [${this.name} - ${message?.fields.routingKey}] Received, ${messageAsString}`); this.receivedMessages.push(messageAsString); }, { noAck: true });
どのルーティングキーによるメッセージなのかは、受信したメッセージの中身でわかるようです。
noAck
をtrue
にしているので、メッセージを受信したらその時点でブローカーからメッセージを削除します。
これでプロデューサーとコンシューマーの準備は完了です。
テストコードで確認する
確認はテストコードで行います。
test/routing.test.ts
import { setTimeout } from 'node:timers/promises'; import { Consumer } from '../src/Consumer'; import { Producer } from '../src/Producer'; test('routing test', async () => { const url = 'amqp://kazuhira:password@172.17.0.2:5672'; const exchange = 'direct_logs'; const producer = await Producer.create(url, exchange); const infoLevelConsumer = await Consumer.create('info-level-consumer', url, exchange, ['info']); const warnLevelConsumer = await Consumer.create('warn-level-consumer', url, exchange, ['warn']); const errorLevelConsumer = await Consumer.create('error-level-consumer', url, exchange, ['error']); const allLevelConsumer = await Consumer.create('all-level-consumer', url, exchange, ['info', 'warn', 'error']); try { await infoLevelConsumer.start(); await warnLevelConsumer.start(); await errorLevelConsumer.start(); await allLevelConsumer.start(); // routing key, info producer.send('info', 'Blue'); await setTimeout(1 * 1000); expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Blue']); expect(warnLevelConsumer.getReceivedMessages()).toStrictEqual([]); expect(errorLevelConsumer.getReceivedMessages()).toStrictEqual([]); expect(allLevelConsumer.getReceivedMessages()).toStrictEqual(['Blue']); // routing key, warn producer.send('warn', 'Green'); await setTimeout(1 * 1000); expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Blue']); expect(warnLevelConsumer.getReceivedMessages()).toStrictEqual(['Green']); expect(errorLevelConsumer.getReceivedMessages()).toStrictEqual([]); expect(allLevelConsumer.getReceivedMessages()).toStrictEqual(['Blue', 'Green']); // routing key, error producer.send('error', 'Red'); await setTimeout(1 * 1000); expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Blue']); expect(warnLevelConsumer.getReceivedMessages()).toStrictEqual(['Green']); expect(errorLevelConsumer.getReceivedMessages()).toStrictEqual(['Red']); expect(allLevelConsumer.getReceivedMessages()).toStrictEqual(['Blue', 'Green', 'Red']); } finally { await producer.close(); await infoLevelConsumer.close(); await warnLevelConsumer.close(); await errorLevelConsumer.close(); await allLevelConsumer.close(); } });
第4引数の配列がルーティングキーです。お題はチュートリアルと同じようにログレベルにして、最後のコンシューマーだけすべての
メッセージを受け取るように複数のルーティングキーを指定しています。
const infoLevelConsumer = await Consumer.create('info-level-consumer', url, exchange, ['info']); const warnLevelConsumer = await Consumer.create('warn-level-consumer', url, exchange, ['warn']); const errorLevelConsumer = await Consumer.create('error-level-consumer', url, exchange, ['error']); const allLevelConsumer = await Consumer.create('all-level-consumer', url, exchange, ['info', 'warn', 'error']);
あとはコンシューマーを開始して、ルーティングキーを指定してメッセージを送り、確認します。
await infoLevelConsumer.start(); await warnLevelConsumer.start(); await errorLevelConsumer.start(); await allLevelConsumer.start(); // routing key, info producer.send('info', 'Blue'); await setTimeout(1 * 1000); expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Blue']); expect(warnLevelConsumer.getReceivedMessages()).toStrictEqual([]); expect(errorLevelConsumer.getReceivedMessages()).toStrictEqual([]); expect(allLevelConsumer.getReceivedMessages()).toStrictEqual(['Blue']);
最後がこちらですが、各レベルごとにメッセージがコンシューマーに振り分けられ、最後のコンシューマーだけがすべてのメッセージを
受け取ることを確認します。
// routing key, error producer.send('error', 'Red'); await setTimeout(1 * 1000); expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Blue']); expect(warnLevelConsumer.getReceivedMessages()).toStrictEqual(['Green']); expect(errorLevelConsumer.getReceivedMessages()).toStrictEqual(['Red']); expect(allLevelConsumer.getReceivedMessages()).toStrictEqual(['Blue', 'Green', 'Red']);
テストを実行。
$ npm run test
出力されるログは、こんな感じになります。
> routing@1.0.0 test > jest console.log [2023-09-21T14:28:48.467Z] [info-level-consumer - [info]] waiting for message, for queue[amq.gen-PQK-y4D3rWGBwpC0i8E1uA] at log (src/log.ts:24:11) console.log [2023-09-21T14:28:48.523Z] [warn-level-consumer - [warn]] waiting for message, for queue[amq.gen-qukTzE39hc3wInQ9kGBNpw] at log (src/log.ts:24:11) console.log [2023-09-21T14:28:48.529Z] [error-level-consumer - [error]] waiting for message, for queue[amq.gen-led1Rj7VGecWDwzt8_cz1g] at log (src/log.ts:24:11) console.log [2023-09-21T14:28:48.538Z] [all-level-consumer - [info, warn, error]] waiting for message, for queue[amq.gen-H6nNi2vUJ25tgZu3WF4eew] at log (src/log.ts:24:11) console.log [2023-09-21T14:28:48.544Z] [x] Sent info: 'Blue' at log (src/log.ts:24:11) console.log [2023-09-21T14:28:48.551Z] [info-level-consumer - info] Received, Blue at log (src/log.ts:24:11) console.log [2023-09-21T14:28:48.556Z] [all-level-consumer - info] Received, Blue at log (src/log.ts:24:11) console.log [2023-09-21T14:28:49.558Z] [x] Sent warn: 'Green' at log (src/log.ts:24:11) console.log [2023-09-21T14:28:49.563Z] [warn-level-consumer - warn] Received, Green at log (src/log.ts:24:11) console.log [2023-09-21T14:28:49.568Z] [all-level-consumer - warn] Received, Green at log (src/log.ts:24:11) console.log [2023-09-21T14:28:50.565Z] [x] Sent error: 'Red' at log (src/log.ts:24:11) console.log [2023-09-21T14:28:50.569Z] [error-level-consumer - error] Received, Red at log (src/log.ts:24:11) console.log [2023-09-21T14:28:50.573Z] [all-level-consumer - error] Received, Red at log (src/log.ts:24:11) PASS test/routing.test.ts ✓ routing test (3489 ms) Test Suites: 1 passed, 1 total Tests: 1 passed, 1 total Snapshots: 0 total Time: 3.73 s, estimated 4 s Ran all test suites.
OKですね。
また、テスト実行中に以下のコマンドを実行してみます。
$ sudo -u rabbitmq rabbitmqctl list_exchanges $ sudo -u rabbitmq rabbitmqctl list_queues $ sudo -u rabbitmq rabbitmqctl list_bindings
結果。
$ sudo -u rabbitmq rabbitmqctl list_exchanges Listing exchanges for vhost / ... name type amq.rabbitmq.trace topic direct amq.direct direct amq.match headers direct_logs direct amq.topic topic amq.headers headers amq.fanout fanout $ sudo -u rabbitmq rabbitmqctl list_queues Listing queues for vhost / ... name messages amq.gen-SeeoxoV2bpfFOWpimilItg 0 amq.gen-qW51Nipw2X9KE5IUCfVKGw 0 amq.gen-Y3RxBoKwYSyFxk3O_b9AFg 0 amq.gen-Nitvmj1v32HsLLQ0jRm6JQ 0 $ sudo -u rabbitmq rabbitmqctl list_bindings Listing bindings for vhost /... source_name source_kind destination_name destination_kind routing_key arguments exchange amq.gen-SeeoxoV2bpfFOWpimilItg queue amq.gen-SeeoxoV2bpfFOWpimilItg [] exchange amq.gen-qW51Nipw2X9KE5IUCfVKGw queue amq.gen-qW51Nipw2X9KE5IUCfVKGw [] exchange amq.gen-Y3RxBoKwYSyFxk3O_b9AFg queue amq.gen-Y3RxBoKwYSyFxk3O_b9AFg [] exchange amq.gen-Nitvmj1v32HsLLQ0jRm6JQ queue amq.gen-Nitvmj1v32HsLLQ0jRm6JQ [] direct_logs exchange amq.gen-SeeoxoV2bpfFOWpimilItg queue error [] direct_logs exchange amq.gen-Y3RxBoKwYSyFxk3O_b9AFg queue error [] direct_logs exchange amq.gen-Nitvmj1v32HsLLQ0jRm6JQ queue info [] direct_logs exchange amq.gen-Y3RxBoKwYSyFxk3O_b9AFg queue info [] direct_logs exchange amq.gen-Y3RxBoKwYSyFxk3O_b9AFg queue warn [] direct_logs exchange amq.gen-qW51Nipw2X9KE5IUCfVKGw queue warn []
Direct Exchangeがあり
direct_logs direct
一時キューがあり
amq.gen-SeeoxoV2bpfFOWpimilItg 0 amq.gen-qW51Nipw2X9KE5IUCfVKGw 0 amq.gen-Y3RxBoKwYSyFxk3O_b9AFg 0 amq.gen-Nitvmj1v32HsLLQ0jRm6JQ 0
Exchangeと一時キューの紐付けも確認できます。
source_name source_kind destination_name destination_kind routing_key arguments exchange amq.gen-SeeoxoV2bpfFOWpimilItg queue amq.gen-SeeoxoV2bpfFOWpimilItg [] exchange amq.gen-qW51Nipw2X9KE5IUCfVKGw queue amq.gen-qW51Nipw2X9KE5IUCfVKGw [] exchange amq.gen-Y3RxBoKwYSyFxk3O_b9AFg queue amq.gen-Y3RxBoKwYSyFxk3O_b9AFg [] exchange amq.gen-Nitvmj1v32HsLLQ0jRm6JQ queue amq.gen-Nitvmj1v32HsLLQ0jRm6JQ [] direct_logs exchange amq.gen-SeeoxoV2bpfFOWpimilItg queue error [] direct_logs exchange amq.gen-Y3RxBoKwYSyFxk3O_b9AFg queue error [] direct_logs exchange amq.gen-Nitvmj1v32HsLLQ0jRm6JQ queue info [] direct_logs exchange amq.gen-Y3RxBoKwYSyFxk3O_b9AFg queue info [] direct_logs exchange amq.gen-Y3RxBoKwYSyFxk3O_b9AFg queue warn [] direct_logs exchange amq.gen-qW51Nipw2X9KE5IUCfVKGw queue warn []
ルーティングキーもわかりますね。
こんなところでしょうか。
おわりに
RabbitMQのJavaScriptチュートリアルの「Routing」を、TypeScriptで試してみました。
けっこう久しぶりにやったのでだいぶ忘れていたのですが、前に書いたエントリーなどを見ながらいろいろ思い出せたので、割とあっさり
確認できました。
とりあえず、チュートリアルはひととおり通したいなと思います。