これは、なにをしたくて書いたもの?
RabbitMQのチュートリアルをJavaScriptクライアント+TypeScriptでやっていこう、ということで。
今回は「Work Queues」を扱います。こちらですね。
RabbitMQ tutorial - Work Queues — RabbitMQ
Work Queues
最初のチュートリアルでは、ひとつのキューに対して、それぞれひとつずつのプロデューサーとコンシューマーを作成しました。
RabbitMQ tutorial - "Hello World!" — RabbitMQ
今回は、複数のコンシューマー(ワーカー)に対して、時間のかかるタスクを分散して処理してもらうチュートリアルになります。
RabbitMQ tutorial - Work Queues — RabbitMQ
このパターンを使うことで、プロデューサーはキューに対してメッセージを送信、ワーカーに処理を依頼することで重たい処理の
完了を待つことを回避することができます。
ワーカーは、キューに入ったメッセージを順次読み出します。
また、このチュートリアルは以下もポイントです。
- ワーカーに対する、メッセージのラウンドロビンディスパッチ
- メッセージの配信確認(ACK)
- メッセージの永続化
では、進めていきましょう。
環境
今回の環境は、こちら。
$ sudo -u rabbitmq rabbitmqctl version 3.11.10
RabbitMQは、172.17.0.2で動作しているものとします。
ユーザーも作成しておきます。
$ sudo -u rabbitmq rabbitmqctl add_user kazuhira password $ sudo -u rabbitmq rabbitmqctl set_permissions -p / kazuhira '.*' '.*' '.*' $ sudo -u rabbitmq rabbitmqctl set_user_tags kazuhira monitoring
使用するNode.jsのバージョン。
$ node --version v18.15.0 $ npm --version 9.5.0
準備
まずはNode.jsプロジェクトを作成します。
$ npm init -y $ npm i -D typescript $ npm i -D @types/node@v18 $ npm i -D prettier $ npm i -D jest @types/jest $ npm i -D esbuild esbuild-jest
確認は、テストコードで行うことにします。
RabbitMQに接続するためのamqplibとその型定義をインストール。
$ npm i amqplib $ npm i -D @types/amqplib
$ mkdir src test
依存関係は、こうなりました。
"devDependencies": { "@types/amqplib": "^0.10.1", "@types/jest": "^29.5.0", "@types/node": "^18.15.3", "esbuild": "^0.17.12", "esbuild-jest": "^0.5.0", "jest": "^29.5.0", "prettier": "^2.8.4", "typescript": "^5.0.2" }, "dependencies": { "amqplib": "^0.10.3" }
scripts
。
"scripts": { "build": "tsc --project .", "build:watch": "tsc --project . --watch", "typecheck": "tsc --project ./tsconfig.typecheck.json", "typecheck:watch": "tsc --project ./tsconfig.typecheck.json --watch", "test": "jest", "format": "prettier --write src test" },
各種設定ファイル。
tsconfig.json
{ "compilerOptions": { "target": "esnext", "module": "commonjs", "moduleResolution": "node", "lib": ["esnext"], "baseUrl": "./src", "outDir": "dist", "strict": true, "forceConsistentCasingInFileNames": true, "noFallthroughCasesInSwitch": true, "noImplicitOverride": true, "noImplicitReturns": true, "noPropertyAccessFromIndexSignature": true, "esModuleInterop": true }, "include": [ "src" ] }
tsconfig.typecheck.json
{ "extends": "./tsconfig", "compilerOptions": { "baseUrl": "./", "noEmit": true }, "include": [ "src", "test" ] }
.prettierrc.json
{ "singleQuote": true, "printWidth": 120 }
jest.config.js
module.exports = { testEnvironment: 'node', transform: { "^.+\\.tsx?$": "esbuild-jest" } };
RabbitMQのJavaScriptチュートリアルの「Work Queues」をTypeScriptとJestで試す
では、こちらをTypeScriptとJestで試していきます。
RabbitMQ tutorial - Work Queues — RabbitMQ
もとのソースコードはプロデューサーとコンシューマー × 2をそれぞれ別プロセスで起動しているのですが、今回は頑張ってこれを
テストコード内で表現してみましょう。
プロデューサーを作成する
最初はプロデューサーを作成します。今回はプロデューサーもワーカーも、クラスとして作成することにしました。
src/Producer.ts
import amqp from 'amqplib'; export class Producer { private conn: amqp.Connection; private channel: amqp.Channel; private queue: string; constructor(conn: amqp.Connection, queue: string, channel: amqp.Channel) { this.conn = conn; this.queue = queue; this.channel = channel; } static async create(url: string, queue: string): Promise<Producer> { const conn = await amqp.connect(url); const channel = await conn.createChannel(); channel.assertQueue(queue, { durable: true }); return new Producer(conn, queue, channel); } send(message: string): void { this.channel.sendToQueue(this.queue, Buffer.from(message), { persistent: true }); console.log(" [x] Send '%s'", message); } async close(): Promise<void> { await this.conn.close(); } }
インスタンスを作成して、send
メソッドでメッセージ送信、最後にclose
する使い方になります。
最初のチュートリアルと使っているAPIはほぼ変わらないのですが、以下の2箇所が異なります。
channel.assertQueue(queue, { durable: true }); this.channel.sendToQueue(this.queue, Buffer.from(message), { persistent: true });
Channel#assertQueue
に指定するオプションであるdurable
をtrue
にすることで、RabbitMQが再起動してもメッセージが失われない
キューとすることができます。
Channel-oriented API reference / API reference / Channel / assertQueue
また、永続化するメッセージであることを示すには、ConfirmChannel#sendToQueue
のオプションであるpersistent
をtrue
にする必要が
あります。
Channel-oriented API reference / API reference / ConfirmChannel / sendToQueue
このメソッドにはオプションの説明が記載されていないのですが、こちらを見れば良さそうですね。
Channel-oriented API reference / API reference / Channel / publish
ワーカーを作成する
次に、ワーカーを作成します。
src/Worker.ts
import amqp from 'amqplib'; import { setTimeout } from 'timers/promises'; export class Worker { private name: string; private conn: amqp.Connection; private channel: amqp.Channel; private queue: string; private receivedMessages: string[] = []; constructor(name: string, conn: amqp.Connection, queue: string, channel: amqp.Channel) { this.name = name; this.conn = conn; this.queue = queue; this.channel = channel; } static async create(name: string, url: string, queue: string): Promise<Worker> { const conn = await amqp.connect(url); const channel = await conn.createChannel(); await channel.assertQueue(queue, { durable: true }); return new Worker(name, conn, queue, channel); } async startWithNoAck(): Promise<void> { await this.channel.consume( this.queue, async (message) => { const contentAsString = message!.content.toString(); const secs = contentAsString.split('.').length - 1; console.log(' %s: [x] Received %s, Sleep Time %d', this.name, contentAsString, secs); await setTimeout(secs * 1000); this.receivedMessages.push(contentAsString); }, { noAck: true, } ); } async startWithAck(): Promise<void> { this.channel.prefetch(1); await this.channel.consume( this.queue, async (message) => { const contentAsString = message!.content.toString(); const secs = contentAsString.split('.').length - 1; console.log(' %s: [x] Received %s, Sleep Time %d', this.name, contentAsString, secs); await setTimeout(secs * 1000); this.receivedMessages.push(contentAsString); this.channel.ack(message!); }, { noAck: false, } ); } getReceivedMessages(): string[] { return [...this.receivedMessages]; } async close(): Promise<void> { await this.conn.close(); } }
インスタンスを作成した後に、start...
メソッドでメッセージの受信を行い、getReceivedMessages
でその時点で受信している
メッセージの配列を返します。最後にclose
、ですね。
キューの作成(存在確認)は、プロデューサーと同じようにdurable
をtrue
にしておきます。
await channel.assertQueue(queue, { durable: true });
メッセージの受信は、2通り用意しています。
まずは、Channel#consume
の呼び出しの際にオプションのnoAck
をtrue
に指定するパターン。
async startWithNoAck(): Promise<void> { await this.channel.consume( this.queue, async (message) => { const contentAsString = message!.content.toString(); const secs = contentAsString.split('.').length - 1; console.log(' %s: [x] Received %s, Sleep Time %d', this.name, contentAsString, secs); await setTimeout(secs * 1000); this.receivedMessages.push(contentAsString); }, { noAck: true, } ); }
noAck
は、true
にするとブローカーはコンシューマーに配信したメッセージの確認を行わず、ブローカーがメッセージをネットワークに
送信した時点ですぐにメッセージを削除します。
false
にすると、コンシューマーがメッセージの確認結果をブローカーに返す必要があります。この時、確認を行わなかった場合は
Channel
を閉じるとメッセージが再度キューに入れられることがあります。
デフォルトはfalse
です。
Channel-oriented API reference / API reference / Channel / consume
こちらのメソッド定義では、noAck
をtrue
にしているのですぐにRabbitMQブローカーからすぐにメッセージを削除することになります。
もうひとつの定義。
async startWithAck(): Promise<void> { this.channel.prefetch(1); await this.channel.consume( this.queue, async (message) => { const contentAsString = message!.content.toString(); const secs = contentAsString.split('.').length - 1; console.log(' %s: [x] Received %s, Sleep Time %d', this.name, contentAsString, secs); await setTimeout(secs * 1000); this.receivedMessages.push(contentAsString); this.channel.ack(message!); }, { noAck: false, } ); }
こちらではnoAck
をfalse
にしています。
こうすると、コンシューマーからブローカーに確認結果を送り返す必要があります。それが以下のChannel#ack
の部分です。
this.channel.ack(message!);
Channel-oriented API reference / API reference / Channel / ack
第1引数には、確認対象のメッセージを指定します。
第2引数をtrue
にすると指定されたメッセージおよびそれ以前のメッセージをすべて確認済みにします。省略した場合は、
false
と同じとみなされます。
対象のChannel
上のすべてのメッセージを確認対象とする、Channel#ackAll
というメソッドもあります。
Channel-oriented API reference / API reference / Channel / ackAll
この反対として、対象のメッセージを拒否するChannel#nack
、未処理のメッセージをすべて拒否するChannel#nackAll
もあります。
requeue
をtrue
にすると、キューに戻す動きになるようです。
- Channel-oriented API reference / API reference / Channel / nack
- Channel-oriented API reference / API reference / Channel / nackAll
また、デフォルトでRabbitMQはメッセージをラウンドロビンでコンシューマーに送信します。ただ、これはメッセージを単純に
ディスパッチするので、RabbitMQブローカーがコンシューマーにひとつずつメッセージを送信する(一気に複数のメッセージを
送信しないようにする)ようにしています。
それが以下ですね。
this.channel.prefetch(1);
Channel-oriented API reference / API reference / Channel / prefetch
これは、コンシューマー単位のプリフェッチ数を設定します。ここで指定する値は、コンシューマー単位で確認(ACK)を待つ最大数です。
値を設定すると、ひとつまたはそれ以上の確認(ACK)がコンシューマーから送信されるまで、ブローカーはそのコンシューマーに
メッセージを送信しなくなります。
テストコードで確認する
最後は、テストコードを書いて確認しましょう。
test/work-queues.test.ts
import { setTimeout } from 'timers/promises'; import { Producer } from '../src/Producer'; import { Worker } from '../src/Worker'; jest.setTimeout(30 * 1000); test('work queues, no ack', async () => { const url = 'amqp://kazuhira:password@172.17.0.2:5672'; const queue = 'task_queue_auto_ack'; const producer = await Producer.create(url, queue); const worker1 = await Worker.create('no-ack-worker1', url, queue); const worker2 = await Worker.create('no-ack-worker2', url, queue); try { await worker1.startWithNoAck(); await worker2.startWithNoAck(); producer.send('First message.'); producer.send('Second message..'); producer.send('Third message...'); producer.send('Fourth message....'); producer.send('Fifth message.....'); await setTimeout(2 * 1000); // send wait await setTimeout(10 * 1000); // receive wait expect(worker1.getReceivedMessages()).toEqual(['First message.', 'Third message...', 'Fifth message.....']); expect(worker2.getReceivedMessages()).toEqual(['Second message..', 'Fourth message....']); } finally { await producer.close(); await worker1.close(); await worker2.close(); } }); test('work queues, with ack', async () => { const url = 'amqp://kazuhira:password@172.17.0.2:5672'; const queue = 'task_queue_manual_ack'; const producer = await Producer.create(url, queue); const worker1 = await Worker.create('with-ack-worker1', url, queue); const worker2 = await Worker.create('with-ack-worker2', url, queue); try { await worker1.startWithAck(); await worker2.startWithAck(); producer.send('First message.'); producer.send('Second message..'); producer.send('Third message...'); producer.send('Fourth message....'); producer.send('Fifth message.....'); await setTimeout(2 * 1000); // send wait await setTimeout(10 * 1000); // receive wait expect(worker1.getReceivedMessages()).toEqual(['First message.', 'Third message...', 'Fifth message.....']); expect(worker2.getReceivedMessages()).toEqual(['Second message..', 'Fourth message....']); } finally { await producer.close(); await worker1.close(); await worker2.close(); } });
それぞれ、確認(ACK)なし、確認(ACK)ありのパターンです。今回は結果は同じですが、メッセージがラウンドロビンで
振り分けられていることも確認できます。
では、実行。
$ npm test
コンソール上のログは、こんな感じになります。
> work-queues@1.0.0 test > jest console.log [x] Send 'First message.' at Producer.send (src/Producer.ts:48:13) console.log [x] Send 'Second message..' at Producer.send (src/Producer.ts:48:13) console.log [x] Send 'Third message...' at Producer.send (src/Producer.ts:48:13) console.log [x] Send 'Fourth message....' at Producer.send (src/Producer.ts:48:13) console.log [x] Send 'Fifth message.....' at Producer.send (src/Producer.ts:48:13) console.log no-ack-worker1: [x] Received First message., Sleep Time 1 at channel.consume.noAck (src/Worker.ts:55:17) console.log no-ack-worker2: [x] Received Second message.., Sleep Time 2 at channel.consume.noAck (src/Worker.ts:55:17) console.log no-ack-worker2: [x] Received Fourth message...., Sleep Time 4 at channel.consume.noAck (src/Worker.ts:55:17) console.log no-ack-worker1: [x] Received Third message..., Sleep Time 3 at channel.consume.noAck (src/Worker.ts:55:17) console.log no-ack-worker1: [x] Received Fifth message....., Sleep Time 5 at channel.consume.noAck (src/Worker.ts:55:17) console.log [x] Send 'First message.' at Producer.send (src/Producer.ts:48:13) console.log [x] Send 'Second message..' at Producer.send (src/Producer.ts:48:13) console.log [x] Send 'Third message...' at Producer.send (src/Producer.ts:48:13) console.log [x] Send 'Fourth message....' at Producer.send (src/Producer.ts:48:13) console.log [x] Send 'Fifth message.....' at Producer.send (src/Producer.ts:48:13) console.log with-ack-worker1: [x] Received First message., Sleep Time 1 at channel.consume.noAck (src/Worker.ts:71:17) console.log with-ack-worker2: [x] Received Second message.., Sleep Time 2 at channel.consume.noAck (src/Worker.ts:71:17) console.log with-ack-worker1: [x] Received Third message..., Sleep Time 3 at channel.consume.noAck (src/Worker.ts:71:17) console.log with-ack-worker2: [x] Received Fourth message...., Sleep Time 4 at channel.consume.noAck (src/Worker.ts:71:17) console.log with-ack-worker1: [x] Received Fifth message....., Sleep Time 5 at channel.consume.noAck (src/Worker.ts:71:17) PASS test/work-queues.test.ts (24.796 s) ✓ work queues, no ack (12259 ms) ✓ work queues, with ack (12215 ms) Test Suites: 1 passed, 1 total Tests: 2 passed, 2 total Snapshots: 0 total Time: 24.864 s, estimated 25 s Ran all test suites.
OKですね。
まとめ
RabbitMQのJavaScriptチュートリアルの「Work Queues」を、TypeScriptで試してみました。
テストコードでの確認にあたって、扱いやすいようにクラスにまとめたりとだいぶ四苦八苦しましたが、最終的にチュートリアルと
同じことが実現できたので良かったのかなと。
TypeScriptの学習も兼ねているので。