これは、なにをしたくて書いたもの?
RabbitMQのチュートリアルをJavaScriptクライアント+TypeScriptでやっていこう、ということで。
今回は「RPC」を扱います。
RabbitMQ tutorial - Remote procedure call (RPC) — RabbitMQ
今回で、この一連のお題は最後にします。
RPC
「Work Queues」チュートリアルでは、ワークキューを使って時間のかかるタスクを複数のコンシューマー(ワーカー)に分散して
処理を行ってもらうものでした。
RabbitMQ tutorial - Work Queues — RabbitMQ
RabbitMQのJavaScriptチュートリアルの「Work Queues」をTypeScriptで試す - CLOVER🍀
今回は、ワーカー側で処理を実行してその結果を受け取るというパターンです。なので「RPC」ですね。
こんな感じで実現するようです。
- 2つのキューを使用する
- そのうちの片方は、RPCワーカー(サーバー)に対して、プロデューサー(クライアント)ごとに作成する匿名の排他的キュー
- コールバックキュー
- RPCリクエストを送る場合、クライアントはコールバックキューを指定する
reply_to
、どのリクエストに関するものなのかを特定するためのcorrelation_id
の2つのプロパティを指定する - クライアントは、キュー(コールバックキューではない方)にリクエストを送る
- RPCワーカーは、キューからリクエストを受け取るのを待ち、処理を終えると
reply_to
で指定されたキューにレスポンスを返す - クライアントはコールバックキューでデータを待ち、受け取った場合は
correlation_id
を確認し、リクエストで送信した値と一致する場合はリクエストに対するレスポンスと見なす
なお、RPCのチュートリアルページでは、以下の点をRPCの注意事項として記載しています。
- 呼び出している処理が実はローカルの関数呼び出しではなく、RPCであるかもしれないことに注意すること(特にプログラマーが気づいていない場合)
- システムを複雑化し、デバッグをしづらくする
- RPCの乱用は、ソフトウェアのシンプルさを失わせメンテナンスの難しいスパゲッティコードを生み出すかもしれない
関数呼び出し自体はプログラミングで一般的ですが、実はそれがリモート呼び出しだった場合はその事実が隠されやすいので、
注意しましょうということですね。
では、このあたりの動作をNode.js+TypeScriptで試してみます。
環境
今回の環境は、こちら。
$ sudo -u rabbitmq rabbitmqctl version 3.12.10
RabbitMQは、172.17.0.2で動作しているものとします。
ユーザーの作成。
$ sudo -u rabbitmq rabbitmqctl add_user kazuhira password $ sudo -u rabbitmq rabbitmqctl set_permissions -p / kazuhira '.*' '.*' '.*' $ sudo -u rabbitmq rabbitmqctl set_user_tags kazuhira monitoring
使用するNode.jsのバージョン。
$ node --version v18.19.0 $ npm --version 10.2.3
準備
まずはNode.jsプロジェクトを作成。
$ npm init -y $ npm i -D typescript $ npm i -D @types/node@v18 $ npm i -D prettier $ npm i -D jest @types/jest $ npm i -D esbuild esbuild-jest
確認は、テストコードで行います。
RabbitMQに接続するためのamqplibとその型定義をインストール。
$ npm i amqplib $ npm i -D @types/amqplib
$ mkdir src test
依存関係はこうなりました。
"devDependencies": { "@types/amqplib": "^0.10.4", "@types/jest": "^29.5.10", "@types/node": "^18.19.1", "esbuild": "^0.19.8", "esbuild-jest": "^0.5.0", "jest": "^29.7.0", "prettier": "^3.1.0", "typescript": "^5.3.2" }, "dependencies": { "amqplib": "^0.10.3" }
scripts
。
"scripts": { "build": "tsc --project .", "build:watch": "tsc --project . --watch", "typecheck": "tsc --project ./tsconfig.typecheck.json", "typecheck:watch": "tsc --project ./tsconfig.typecheck.json --watch", "test": "jest", "format": "prettier --write src test" },
各種設定ファイル。
tsconfig.json
{ "compilerOptions": { "target": "esnext", "module": "commonjs", "moduleResolution": "node", "lib": ["esnext"], "baseUrl": "./src", "outDir": "dist", "strict": true, "forceConsistentCasingInFileNames": true, "noFallthroughCasesInSwitch": true, "noImplicitOverride": true, "noImplicitReturns": true, "noPropertyAccessFromIndexSignature": true, "esModuleInterop": true }, "include": [ "src" ] }
tsconfig.typecheck.json
{ "extends": "./tsconfig", "compilerOptions": { "baseUrl": "./", "noEmit": true }, "include": [ "src", "test" ] }
.prettierrc.json
{ "singleQuote": true, "printWidth": 120 }
jest.config.js
module.exports = { testEnvironment: 'node', transform: { "^.+\\.tsx?$": "esbuild-jest" } };
RabbitMQのJavaScriptチュートリアルの「RPC」をTypeScriptとJestで試す
では、こちらをTypeScriptとJestで試していきます。
RabbitMQ tutorial - Remote procedure call (RPC) — RabbitMQ
チュートリアル内ではクライアントとサーバーをそれぞれ別プロセスで起動していますが、今回はテストコード内で表現して みます。
お題は加算にしましょう。以下をリクエストとレスポンスの型定義とします。
src/exchange.ts
export type Request = { a: number; b: number; }; export type Response = { result: number; };
クライアントで加算元の数を送信し、サーバーで加算してレスポンスとして返すものとします。
サーバー(コンシューマー)を作成する
最初にサーバーを作成します。RabbitMQではコンシューマーやワーカーと呼ばれる種類に相当します。
src/Server.ts
import { setTimeout } from 'node:timers/promises'; import amqp from 'amqplib'; import { Request, Response } from './exchange'; import { log } from './log'; export class Server { private name: string; private conn: amqp.Connection; private channel: amqp.Channel; private queue: string; constructor(name: string, conn: amqp.Connection, channel: amqp.Channel, queue: string) { this.name = name; this.conn = conn; this.channel = channel; this.queue = queue; } static async create(name: string, url: string, queue: string): Promise<Server> { const conn = await amqp.connect(url); const channel = await conn.createChannel(); await channel.assertQueue(queue, { durable: false }); channel.prefetch(1); return new Server(name, conn, channel, queue); } async start(): Promise<void> { log(` [${this.name}] [x] Start Server, Awaiting RPC requests`); await this.channel.consume(this.queue, async (message) => { if (message !== null) { log(` [${this.name}] sleep, 3 sec...`); await setTimeout(3 * 1000); const request: Request = JSON.parse(message?.content.toString('utf-8')); const response: Response = { result: request.a + request.b, }; this.channel.sendToQueue(message?.properties.replyTo, Buffer.from(JSON.stringify(response)), { correlationId: message?.properties.correlationId, }); this.channel.ack(message); } }); } async close(): Promise<void> { await this.conn.close(); } }
キューを作成。
await channel.assertQueue(queue, { durable: false }); channel.prefetch(1);
Channel-oriented API reference / API reference / Channel / assertQueue
このキューが、サーバーとしてリクエストを受け取るためのキューになります。複数のサーバーでリクエストをできる限り均等に
処理するようにするには、Channel#prefetch
を指定するとよいそうです。
Channel-oriented API reference / API reference / Channel / prefetch
そして、キューからメッセージを受信したら、処理を行ってChannel#sendToQueue
でクライアントとの排他的キューにレスポンスを
送信します。
await this.channel.consume(this.queue, async (message) => { if (message !== null) { log(` [${this.name}] sleep, 3 sec...`); await setTimeout(3 * 1000); const request: Request = JSON.parse(message?.content.toString('utf-8')); const response: Response = { result: request.a + request.b, }; this.channel.sendToQueue(message?.properties.replyTo, Buffer.from(JSON.stringify(response)), { correlationId: message?.properties.correlationId, }); this.channel.ack(message); } });
Channel-oriented API reference / API reference / Channel / sendToQueu
この部分ですね。
this.channel.sendToQueue(message?.properties.replyTo, Buffer.from(JSON.stringify(response)), { correlationId: message?.properties.correlationId, });
送信先のキューは、リクエストのメッセージのプロパティに含まれるreplyTo
で指定します。また、レスポンスを返す時にプロパティに
correlationId
を含めます。こちらも、リクエストのメッセージのプロパティに含まれるcorrelationId
を指定します。
こうやって、correlationId
でリクエストとレスポンスを紐付け、経路はreplyTo
で決まるというわけですね。
処理を行う前には、スリープを入れるようにしました。
log(` [${this.name}] sleep, 3 sec...`); await setTimeout(3 * 1000);
これで、重い処理を表現しています。チュートリアルではフィボナッチ数の計算をしていますが、それだとCPUを使い切るので
テストで複数サーバーを利用する際に都合が悪く(fork
が必要になる)、スリープにすることにしました。
ちなみに、logというのはこういう定義です。
src/log.ts
export function log(message: string): void { console.log(`[${new Date().toISOString()}] ${message}`); }
こちらはクライアントでも使います。
クライアント(プロデューサー)を作成する
次は、クライアントを作成します。RabbitMQではプロデューサーの役割ですね。
src/Client.ts
import amqp from 'amqplib'; import { Request, Response } from './exchange'; import { log } from './log'; import { randomUUID } from 'crypto'; export class Client { private conn: amqp.Connection; private channel: amqp.Channel; private queue: string; constructor(conn: amqp.Connection, channel: amqp.Channel, queue: string) { this.conn = conn; this.channel = channel; this.queue = queue; } static async create(url: string, queue: string): Promise<Client> { const conn = await amqp.connect(url); const channel = await conn.createChannel(); return new Client(conn, channel, queue); } async send(request: Request): Promise<Response> { const q = await this.channel.assertQueue('', { exclusive: true }); const correlationId = randomUUID(); return new Promise(async (resolve, reject) => { try { await this.channel.consume(q.queue, async (message) => { if (message?.properties.correlationId === correlationId) { const response: Response = JSON.parse(message.content.toString('utf-8')); resolve(response); } }); this.channel.sendToQueue(this.queue, Buffer.from(JSON.stringify(request)), { correlationId: correlationId, replyTo: q.queue, }); log(`send message, correlationId = ${correlationId}`); } catch (e) { reject(e); } }); } async close(): Promise<void> { await this.conn.close(); } }
こちらで、排他的な匿名キューを作成しています。
const q = await this.channel.assertQueue('', { exclusive: true });
Channel-oriented API reference / API reference / Channel / assertQueue
Channel#assertQueue
の第1引数を空文字にすることで、空文字列('
')で指定しているところがポイントで、
こうするとamq.gen-JzTY20BRgKO-HjmUJj0wLg
のようなランダムな名前でキューが作成されます。 またexclusive
をtrue
に
しているので、これは排他的な一時キューとなり、接続がクローズされると削除されるものになります。
correlationId
はUUIDにしました。
const correlationId = randomUUID();
レスポンスを受信している箇所。レスポンスのメッセージのプロパティに含まれるcorrelationId
が、自分で作成したものと同じであれば
自分のレスポンスだと見なすようにしています。
await this.channel.consume(q.queue, async (message) => { if (message?.properties.correlationId === correlationId) { const response: Response = JSON.parse(message.content.toString('utf-8')); resolve(response); } });
リクエストを送信している部分。
this.channel.sendToQueue(this.queue, Buffer.from(JSON.stringify(request)), { correlationId: corrationId, replyTo: q.queue, });
プロパティとして、correlationId
に作成したUUIDを、replyTo
に一時キューの名前を渡しています。
これで、クライアントとサーバーの準備は完了です。
テストコードで確認する
確認は、テストコードで行います。
test/rpc.test.ts
import { Client } from '../src/Client'; import { Server } from '../src/Server'; import { Request, Response } from '../src/exchange'; test('rpc test', async () => { const url = 'amqp://kazuhira:password@172.17.0.2:5672'; const queue = 'rpc_queue'; const server1 = await Server.create('server1', url, queue); const server2 = await Server.create('server2', url, queue); const client = await Client.create(url, queue); try { await server1.start(); await server2.start(); const request1: Request = { a: 5, b: 8, }; const request2: Request = { a: 2, b: 3, }; const promise1 = client.send(request1); const promise2 = client.send(request2); const response1: Response = await promise1; const response2: Response = await promise2; expect(response1).toStrictEqual({ result: 13 }); expect(response2).toStrictEqual({ result: 5 }); } finally { await client.close(); await server1.close(); await server2.close(); } });
サーバーのインスタンスを2つ、クライアントのインスタンスをひとつ作成。
const server1 = await Server.create('server1', url, queue); const server2 = await Server.create('server2', url, queue); const client = await Client.create(url, queue);
サーバー起動後、クライアントから2つメッセージを送信して結果を確認します。
await server1.start(); await server2.start(); const request1: Request = { a: 5, b: 8, }; const request2: Request = { a: 2, b: 3, }; const promise1 = client.send(request1); const promise2 = client.send(request2); const response1: Response = await promise1; const response2: Response = await promise2; expect(response1).toStrictEqual({ result: 13 }); expect(response2).toStrictEqual({ result: 5 });
確認。
$ npm test
実行時のログはこちら。
> rpc@1.0.0 test > jest console.log [2023-12-02T15:23:32.980Z] [server1] [x] Start Server, Awaiting RPC requests at log (src/log.ts:24:11) console.log [2023-12-02T15:23:33.035Z] [server2] [x] Start Server, Awaiting RPC requests at log (src/log.ts:24:11) console.log [2023-12-02T15:23:33.058Z] send message, correlationId = 69a5198e-10c4-4e47-86ed-16087f36a4e6 at log (src/log.ts:24:11) console.log [2023-12-02T15:23:33.083Z] send message, correlationId = ff2aedcf-ca35-45c6-abaf-715c8b434198 at log (src/log.ts:24:11) console.log [2023-12-02T15:23:33.090Z] [server1] sleep, 3 sec... at log (src/log.ts:24:11) console.log [2023-12-02T15:23:33.125Z] [server2] sleep, 3 sec... at log (src/log.ts:24:11) PASS test/rpc.test.ts ✓ rpc test (3396 ms) Test Suites: 1 passed, 1 total Tests: 1 passed, 1 total Snapshots: 0 total Time: 3.725 s, estimated 6 s Ran all test suites.
サーバーはリクエストを受け付けてから3秒スリープするのですが、全体の実行時間は4秒弱なので、処理自体は複数のサーバーインスタンスで
行えています。
それはログからもわかりますね。
console.log [2023-12-02T15:23:33.090Z] [server1] sleep, 3 sec... at log (src/log.ts:24:11) console.log [2023-12-02T15:23:33.125Z] [server2] sleep, 3 sec... at log (src/log.ts:24:11)
メッセージごとに異なるサーバーインスタンスが受信していることも確認できました。
OKですね。
おわりに
RabbitMQのチュートリアルの「RPC」を試してみました。
今までと少し毛色が違うのでなかなか興味深かったのと、少し考え方が広がる構成かなと思いますね。
今回で、Node.js(TypeScript/JavaScript)でRabbitMQのチュートリアルをなぞっていくのは最後にしたいと思います。
始めてから半年くらいかかっていますが、いったん完了(?)ですね。