これは、なにをしたくて書いたもの?
この前、Ubuntu Linux 22.04 LTSにRabbitMQをインストールしてみました。
Ubuntu Linux 22.04 LTSにRabbitMQをインストールする - CLOVER🍀
RabbitMQを見直すのとTypeScriptの学習も兼ねて、JavaScriptクライアントを使ったRabbitMQのチュートリアルをやってみたいと
思います。
RabbitMQのJavaScript/Node.jsクライアント
今回、対象としたいチュートリアルはこちらです。
RabbitMQ tutorial - "Hello World!" — RabbitMQ
こちらのチュートリアルでは、amqplibが使われています。
amqplib | AMQP 0-9-1 library and client for Node.JS
他のJavaScriptクライアントの選択肢はないのかな?と思ったのですが、ドキュメントに記載がありました。
Clients Libraries and Developer Tools / JavaScript and Node
- amqplib: RabbitMQ (AMQP 0-9-1) client for Node.js
- amqp-client: High performance client for both NodeJS and browsers (WebSocket), written in TypeScript
- rabbit.js: message patterns in node.js using RabbitMQ.
- rabbitmq-stream-js-client: RabbitMQ Stream NodeJS client.
- amqp-stats: a node.js interface for RabbitMQ management statistics
- Rascal: a config driven wrapper for amqp.node supporting multi-host connections, automatic error recovery, redelivery flood protection, transparent encryption and channel pooling.
- node-rabbitmq-client: RabbitMQ (AMQP 0-9-1) client library with auto-reconnect, zero dependencies, TypeScript support, and Promise-based API.
ひととおり見てみましたが、amqplibで良さそうです。
DefinitelyTypedもあるみたいですし。
では、amqplibとTypeScriptを使ってチュートリアルを進めてみましょう。
環境
今回の環境は、こちら。
$ sudo -u rabbitmq rabbitmqctl version 3.11.10
RabbitMQは、172.17.0.2で動作しているものとします。
ユーザーも作成しておきます。
$ sudo -u rabbitmq rabbitmqctl add_user kazuhira password $ sudo -u rabbitmq rabbitmqctl set_permissions -p / kazuhira '.*' '.*' '.*' $ sudo -u rabbitmq rabbitmqctl set_user_tags kazuhira monitoring
使用するNode.jsのバージョン。
$ node --version v18.15.0 $ npm --version 9.5.0
準備
まずはNode.jsプロジェクトを作成します。
$ npm init -y $ npm i -D @types/node@v18 $ npm i -D typescript $ npm i -D prettier $ npm i -D jest @types/jest $ npm i -D esbuild esbuild-jest
確認はテストコードで行うことにします。なので、Jestもインストール。
amqplibと型定義をインストール。
$ npm i amqplib $ npm i -D @types/amqplib
テストコード配置用のディレクトリを作成。
$ mkdir test
依存関係は、こうなりました。
"devDependencies": { "@types/amqplib": "^0.10.1", "@types/jest": "^29.4.0", "@types/node": "^18.15.0", "esbuild": "^0.17.11", "esbuild-jest": "^0.5.0", "jest": "^29.5.0", "prettier": "^2.8.4", "typescript": "^4.9.5" }, "dependencies": { "amqplib": "^0.10.3" }
scripts
。
"scripts": { "build": "tsc --project .", "build:watch": "tsc --project . --watch", "typecheck": "tsc --project ./tsconfig.typecheck.json", "typecheck:watch": "tsc --project ./tsconfig.typecheck.json --watch", "test": "jest", "format": "prettier --write src test" },
各種設定ファイル。
tsconfig.json
{ "compilerOptions": { "target": "esnext", "module": "commonjs", "moduleResolution": "node", "lib": ["esnext"], "baseUrl": "./src", "outDir": "dist", "strict": true, "forceConsistentCasingInFileNames": true, "noFallthroughCasesInSwitch": true, "noImplicitOverride": true, "noImplicitReturns": true, "noPropertyAccessFromIndexSignature": true, "esModuleInterop": true }, "include": [ "src" ] }
tsconfig.typecheck.json
{ "extends": "./tsconfig", "compilerOptions": { "baseUrl": "./", "noEmit": true }, "include": [ "src", "test" ] }
.prettierrc.json
{ "singleQuote": true, "printWidth": 120 }
jest.config.js
module.exports = { testEnvironment: 'node', transform: { "^.+\\.tsx?$": "esbuild-jest" } };
RabbitMQのJavaScriptチュートリアルの「Hello World!」をTypeScriptとJestで試す
では、こちらをTypeScriptとJestで試していきます。
RabbitMQ tutorial - "Hello World!" — RabbitMQ
簡単なメッセージ送信、受信のプログラムですね。
amqplibの使い方は、こちらも合わせて見ていきます。
amqplib | AMQP 0-9-1 library and client for Node.JS
amqplib | Channel API reference
作成したソースコードは、こちら。
test/send-receive.test.ts
import amqp from 'amqplib'; import { setTimeout } from 'timers/promises'; test('send messege, receive message', async () => { const queue = 'hello'; async function send(): Promise<void> { const conn = await amqp.connect('amqp://kazuhira:password@172.17.0.2:5672'); try { const channel = await conn.createChannel(); await channel.assertQueue(queue, { durable: false }); const message = 'Hello World!!'; channel.sendToQueue(queue, Buffer.from(message)); await setTimeout(2 * 1000); // send wait } finally { await conn.close(); } } await send(); async function receive(): Promise<string> { const conn = await amqp.connect('amqp://kazuhira:password@172.17.0.2:5672'); const channel = await conn.createChannel(); await channel.assertQueue(queue, { durable: false }); return new Promise<string>(async (resolve) => { await channel.consume(queue, (message) => { resolve(message!.content.toString()); }); await conn.close(); }); } const receivedMessage = await receive(); expect(receivedMessage).toBe('Hello World!!'); });
簡単に説明しましょう。
まずはメッセージ送信部。
async function send(): Promise<void> { const conn = await amqp.connect('amqp://kazuhira:password@172.17.0.2:5672'); try { const channel = await conn.createChannel(); await channel.assertQueue(queue, { durable: false }); const message = 'Hello World!!'; channel.sendToQueue(queue, Buffer.from(message)); await setTimeout(2 * 1000); // send wait } finally { await conn.close(); } } await send();
amqp#connect
で、RabbitMQへ接続。
Channel-oriented API reference / API reference / connect
const conn = await amqp.connect('amqp://kazuhira:password@172.17.0.2:5672');
続いてChannel
を作成。
Channel-oriented API reference / API reference / ChannelModel and CallbackModel / createChannel
const channel = await conn.createChannel();
Channel#assertQueue
ではキューの存在確認を行い、存在しない場合は作成します。
Channel-oriented API reference / API reference / Channel / assertQueue
await channel.assertQueue(queue, { durable: false });
durable
をfalse
にすると、ブローカーが再起動するとキューがなくなります。デフォルトはtrue
で、ブローカーが再起動しても
キューは存在し続けます。
その他のオプションは、ドキュメントを参照してください。
そして、キューにメッセージを送信。
Channel-oriented API reference / API reference / Channel / sendToQueue
const message = 'Hello World!!'; channel.sendToQueue(queue, Buffer.from(message));
このメソッドは同期呼び出しのようで、Promise
は返りません。
最後にclose
。
Channel-oriented API reference / API reference / ChannelModel and CallbackModel / close
await conn.close();
ただ、即close
してしまうとRabbitMQにメッセージを送りきれないままクローズしてしまうようなので、直前にスリープを入れています。
await setTimeout(2 * 1000); // send wait
この関数を呼び出して、送信部は終了です。
await send();
続いては、受信側。送信側で送ったメッセージを受信し、アサーションするところまでを目標にします。
async function receive(): Promise<string> { const conn = await amqp.connect('amqp://kazuhira:password@172.17.0.2:5672'); const channel = await conn.createChannel(); await channel.assertQueue(queue, { durable: false }); return new Promise<string>(async (resolve) => { await channel.consume(queue, (message) => { resolve(message!.content.toString()); }); await conn.close(); }); } const receivedMessage = await receive(); expect(receivedMessage).toBe('Hello World!!');
接続の確立やChannel
の作成、キューの宣言は送信時と同じです。
const conn = await amqp.connect('amqp://kazuhira:password@172.17.0.2:5672'); const channel = await conn.createChannel(); await channel.assertQueue(queue, { durable: false });
メッセージの受信は、Channel#consume
で行います。
Channel-oriented API reference / API reference / Channel / consume
await channel.consume(queue, (message) => { resolve(message!.content.toString()); });
メッセージの受信が行われるまで、とりあえず2秒待ってから接続をクローズ。
await setTimeout(2 * 1000); // receive wait await conn.close();
これ全体をPromise
で返すことにしました。
return new Promise<string>(async (resolve) => { await channel.consume(queue, (message) => { resolve(message!.content.toString()); }); await conn.close(); });
あとは、受信して得られたメッセージをアサーションして終了です。
const receivedMessage = await receive(); expect(receivedMessage).toBe('Hello World!!');
確認。
$ npm test > hello-world@1.0.0 test > jest PASS test/send-receive.test.ts ✓ send messege, receive message (2152 ms) Test Suites: 1 passed, 1 total Tests: 1 passed, 1 total Snapshots: 0 total Time: 2.403 s, estimated 3 s Ran all test suites.
OKですね。こんなところでしょうか。
まとめ
RabbitMQのJavaScriptチュートリアルの「Hello World!」を、TypeScriptで試してみました。
Jestでのテストにしつつ、Promise
(async
、await
)を使うように書き直すのも合わせてやりつつ。
ちょうどいい練習になるような気がします。もうちょっと進めてみましょう。