CLOVER🍀

That was when it all began.

RabbitMQのJavaScriptチュートリアルの「RPC」をTypeScriptで試す

これは、なにをしたくて書いたもの?

RabbitMQのチュートリアルJavaScriptクライアント+TypeScriptでやっていこう、ということで。

今回は「RPC」を扱います。

RabbitMQ tutorial - Remote procedure call (RPC) — RabbitMQ

今回で、この一連のお題は最後にします。

RPC

「Work Queues」チュートリアルでは、ワークキューを使って時間のかかるタスクを複数のコンシューマー(ワーカー)に分散して
処理を行ってもらうものでした。

RabbitMQ tutorial - Work Queues — RabbitMQ

RabbitMQのJavaScriptチュートリアルの「Work Queues」をTypeScriptで試す - CLOVER🍀

今回は、ワーカー側で処理を実行してその結果を受け取るというパターンです。なので「RPC」ですね。

こんな感じで実現するようです。

  • 2つのキューを使用する
  • そのうちの片方は、RPCワーカー(サーバー)に対して、プロデューサー(クライアント)ごとに作成する匿名の排他的キュー
    • コールバックキュー
  • RPCリクエストを送る場合、クライアントはコールバックキューを指定するreply_to、どのリクエストに関するものなのかを特定するためのcorrelation_idの2つのプロパティを指定する
  • クライアントは、キュー(コールバックキューではない方)にリクエストを送る
  • RPCワーカーは、キューからリクエストを受け取るのを待ち、処理を終えるとreply_toで指定されたキューにレスポンスを返す
  • クライアントはコールバックキューでデータを待ち、受け取った場合はcorrelation_idを確認し、リクエストで送信した値と一致する場合はリクエストに対するレスポンスと見なす

なお、RPCのチュートリアルページでは、以下の点をRPCの注意事項として記載しています。

  • 呼び出している処理が実はローカルの関数呼び出しではなく、RPCであるかもしれないことに注意すること(特にプログラマーが気づいていない場合)
  • システムを複雑化し、デバッグをしづらくする
  • RPCの乱用は、ソフトウェアのシンプルさを失わせメンテナンスの難しいスパゲッティコードを生み出すかもしれない

関数呼び出し自体はプログラミングで一般的ですが、実はそれがリモート呼び出しだった場合はその事実が隠されやすいので、
注意しましょうということですね。

では、このあたりの動作をNode.js+TypeScriptで試してみます。

環境

今回の環境は、こちら。

$ sudo -u rabbitmq rabbitmqctl version
3.12.10

RabbitMQは、172.17.0.2で動作しているものとします。

ユーザーの作成。

$ sudo -u rabbitmq rabbitmqctl add_user kazuhira password
$ sudo -u rabbitmq rabbitmqctl set_permissions -p / kazuhira '.*' '.*' '.*'
$ sudo -u rabbitmq rabbitmqctl set_user_tags kazuhira monitoring

使用するNode.jsのバージョン。

$ node --version
v18.19.0


$ npm --version
10.2.3

準備

まずはNode.jsプロジェクトを作成。

$ npm init -y
$ npm i -D typescript
$ npm i -D @types/node@v18
$ npm i -D prettier
$ npm i -D jest @types/jest
$ npm i -D esbuild esbuild-jest

確認は、テストコードで行います。

RabbitMQに接続するためのamqplibとその型定義をインストール。

$ npm i amqplib
$ npm i -D @types/amqplib

ソースコードを配置するディレクトリを作成。

$ mkdir src test

依存関係はこうなりました。

  "devDependencies": {
    "@types/amqplib": "^0.10.4",
    "@types/jest": "^29.5.10",
    "@types/node": "^18.19.1",
    "esbuild": "^0.19.8",
    "esbuild-jest": "^0.5.0",
    "jest": "^29.7.0",
    "prettier": "^3.1.0",
    "typescript": "^5.3.2"
  },
  "dependencies": {
    "amqplib": "^0.10.3"
  }

scripts

  "scripts": {
    "build": "tsc --project .",
    "build:watch": "tsc --project . --watch",
    "typecheck": "tsc --project ./tsconfig.typecheck.json",
    "typecheck:watch": "tsc --project ./tsconfig.typecheck.json --watch",
    "test": "jest",
    "format": "prettier --write src test"
  },

各種設定ファイル。

tsconfig.json

{
  "compilerOptions": {
    "target": "esnext",
    "module": "commonjs",
    "moduleResolution": "node",
    "lib": ["esnext"],
    "baseUrl": "./src",
    "outDir": "dist",
    "strict": true,
    "forceConsistentCasingInFileNames": true,
    "noFallthroughCasesInSwitch": true,
    "noImplicitOverride": true,
    "noImplicitReturns": true,
    "noPropertyAccessFromIndexSignature": true,
    "esModuleInterop": true
  },
  "include": [
    "src"
  ]
}

tsconfig.typecheck.json

{
  "extends": "./tsconfig",
  "compilerOptions": {
    "baseUrl": "./",
    "noEmit": true
  },
  "include": [
    "src", "test"
  ]
}

.prettierrc.json

{
  "singleQuote": true,
  "printWidth": 120
}

jest.config.js

module.exports = {
  testEnvironment: 'node',
  transform: {
    "^.+\\.tsx?$": "esbuild-jest"
  }
};

RabbitMQのJavaScriptチュートリアルの「RPC」をTypeScriptとJestで試す

では、こちらをTypeScriptとJestで試していきます。

RabbitMQ tutorial - Remote procedure call (RPC) — RabbitMQ

チュートリアル内ではクライアントとサーバーをそれぞれ別プロセスで起動していますが、今回はテストコード内で表現して みます。

お題は加算にしましょう。以下をリクエストとレスポンスの型定義とします。

src/exchange.ts

export type Request = {
  a: number;
  b: number;
};

export type Response = {
  result: number;
};

クライアントで加算元の数を送信し、サーバーで加算してレスポンスとして返すものとします。

サーバー(コンシューマー)を作成する

最初にサーバーを作成します。RabbitMQではコンシューマーやワーカーと呼ばれる種類に相当します。

src/Server.ts

import { setTimeout } from 'node:timers/promises';
import amqp from 'amqplib';
import { Request, Response } from './exchange';
import { log } from './log';

export class Server {
  private name: string;
  private conn: amqp.Connection;
  private channel: amqp.Channel;
  private queue: string;

  constructor(name: string, conn: amqp.Connection, channel: amqp.Channel, queue: string) {
    this.name = name;
    this.conn = conn;
    this.channel = channel;
    this.queue = queue;
  }

  static async create(name: string, url: string, queue: string): Promise<Server> {
    const conn = await amqp.connect(url);
    const channel = await conn.createChannel();

    await channel.assertQueue(queue, { durable: false });
    channel.prefetch(1);

    return new Server(name, conn, channel, queue);
  }

  async start(): Promise<void> {
    log(` [${this.name}] [x] Start Server, Awaiting RPC requests`);

    await this.channel.consume(this.queue, async (message) => {
      if (message !== null) {
        log(` [${this.name}] sleep, 3 sec...`);

        await setTimeout(3 * 1000);

        const request: Request = JSON.parse(message?.content.toString('utf-8'));
        const response: Response = {
          result: request.a + request.b,
        };

        this.channel.sendToQueue(message?.properties.replyTo, Buffer.from(JSON.stringify(response)), {
          correlationId: message?.properties.correlationId,
        });

        this.channel.ack(message);
      }
    });
  }

  async close(): Promise<void> {
    await this.conn.close();
  }
}

キューを作成。

    await channel.assertQueue(queue, { durable: false });
    channel.prefetch(1);

Channel-oriented API reference / API reference / Channel / assertQueue

このキューが、サーバーとしてリクエストを受け取るためのキューになります。複数のサーバーでリクエストをできる限り均等に
処理するようにするには、Channel#prefetchを指定するとよいそうです。

Channel-oriented API reference / API reference / Channel / prefetch

そして、キューからメッセージを受信したら、処理を行ってChannel#sendToQueueでクライアントとの排他的キューにレスポンスを
送信します。

    await this.channel.consume(this.queue, async (message) => {
      if (message !== null) {
        log(` [${this.name}] sleep, 3 sec...`);

        await setTimeout(3 * 1000);

        const request: Request = JSON.parse(message?.content.toString('utf-8'));
        const response: Response = {
          result: request.a + request.b,
        };

        this.channel.sendToQueue(message?.properties.replyTo, Buffer.from(JSON.stringify(response)), {
          correlationId: message?.properties.correlationId,
        });

        this.channel.ack(message);
      }
    });

Channel-oriented API reference / API reference / Channel / sendToQueu

この部分ですね。

        this.channel.sendToQueue(message?.properties.replyTo, Buffer.from(JSON.stringify(response)), {
          correlationId: message?.properties.correlationId,
        });

送信先のキューは、リクエストのメッセージのプロパティに含まれるreplyToで指定します。また、レスポンスを返す時にプロパティに
correlationIdを含めます。こちらも、リクエストのメッセージのプロパティに含まれるcorrelationIdを指定します。

こうやって、correlationIdでリクエストとレスポンスを紐付け、経路はreplyToで決まるというわけですね。

処理を行う前には、スリープを入れるようにしました。

        log(` [${this.name}] sleep, 3 sec...`);

        await setTimeout(3 * 1000);

これで、重い処理を表現しています。チュートリアルではフィボナッチ数の計算をしていますが、それだとCPUを使い切るので
テストで複数サーバーを利用する際に都合が悪く(forkが必要になる)、スリープにすることにしました。

ちなみに、logというのはこういう定義です。

src/log.ts

export function log(message: string): void {
  console.log(`[${new Date().toISOString()}] ${message}`);
}

こちらはクライアントでも使います。

クライアント(プロデューサー)を作成する

次は、クライアントを作成します。RabbitMQではプロデューサーの役割ですね。

src/Client.ts

import amqp from 'amqplib';
import { Request, Response } from './exchange';
import { log } from './log';
import { randomUUID } from 'crypto';

export class Client {
  private conn: amqp.Connection;
  private channel: amqp.Channel;
  private queue: string;

  constructor(conn: amqp.Connection, channel: amqp.Channel, queue: string) {
    this.conn = conn;
    this.channel = channel;
    this.queue = queue;
  }

  static async create(url: string, queue: string): Promise<Client> {
    const conn = await amqp.connect(url);
    const channel = await conn.createChannel();

    return new Client(conn, channel, queue);
  }

  async send(request: Request): Promise<Response> {
    const q = await this.channel.assertQueue('', { exclusive: true });

    const correlationId = randomUUID();

    return new Promise(async (resolve, reject) => {
      try {
        await this.channel.consume(q.queue, async (message) => {
          if (message?.properties.correlationId === correlationId) {
            const response: Response = JSON.parse(message.content.toString('utf-8'));
            resolve(response);
          }
        });

        this.channel.sendToQueue(this.queue, Buffer.from(JSON.stringify(request)), {
          correlationId: correlationId,
          replyTo: q.queue,
        });

        log(`send message, correlationId = ${correlationId}`);
      } catch (e) {
        reject(e);
      }
    });
  }

  async close(): Promise<void> {
    await this.conn.close();
  }
}

こちらで、排他的な匿名キューを作成しています。

    const q = await this.channel.assertQueue('', { exclusive: true });

Channel-oriented API reference / API reference / Channel / assertQueue

Channel#assertQueueの第1引数を空文字にすることで、空文字列('')で指定しているところがポイントで、
こうするとamq.gen-JzTY20BRgKO-HjmUJj0wLgのようなランダムな名前でキューが作成されます。 またexclusivetrue
しているので、これは排他的な一時キューとなり、接続がクローズされると削除されるものになります。

correlationIdはUUIDにしました。

    const correlationId = randomUUID();

レスポンスを受信している箇所。レスポンスのメッセージのプロパティに含まれるcorrelationIdが、自分で作成したものと同じであれば
自分のレスポンスだと見なすようにしています。

        await this.channel.consume(q.queue, async (message) => {
          if (message?.properties.correlationId === correlationId) {
            const response: Response = JSON.parse(message.content.toString('utf-8'));
            resolve(response);
          }
        });

リクエストを送信している部分。

        this.channel.sendToQueue(this.queue, Buffer.from(JSON.stringify(request)), {
          correlationId: corrationId,
          replyTo: q.queue,
        });

プロパティとして、correlationIdに作成したUUIDを、replyToに一時キューの名前を渡しています。

これで、クライアントとサーバーの準備は完了です。

テストコードで確認する

確認は、テストコードで行います。

test/rpc.test.ts

import { Client } from '../src/Client';
import { Server } from '../src/Server';
import { Request, Response } from '../src/exchange';

test('rpc test', async () => {
  const url = 'amqp://kazuhira:password@172.17.0.2:5672';
  const queue = 'rpc_queue';

  const server1 = await Server.create('server1', url, queue);
  const server2 = await Server.create('server2', url, queue);

  const client = await Client.create(url, queue);

  try {
    await server1.start();
    await server2.start();

    const request1: Request = {
      a: 5,
      b: 8,
    };
    const request2: Request = {
      a: 2,
      b: 3,
    };

    const promise1 = client.send(request1);
    const promise2 = client.send(request2);

    const response1: Response = await promise1;
    const response2: Response = await promise2;

    expect(response1).toStrictEqual({ result: 13 });
    expect(response2).toStrictEqual({ result: 5 });
  } finally {
    await client.close();
    await server1.close();
    await server2.close();
  }
});

サーバーのインスタンスを2つ、クライアントのインスタンスをひとつ作成。

  const server1 = await Server.create('server1', url, queue);
  const server2 = await Server.create('server2', url, queue);

  const client = await Client.create(url, queue);

サーバー起動後、クライアントから2つメッセージを送信して結果を確認します。

    await server1.start();
    await server2.start();

    const request1: Request = {
      a: 5,
      b: 8,
    };
    const request2: Request = {
      a: 2,
      b: 3,
    };

    const promise1 = client.send(request1);
    const promise2 = client.send(request2);

    const response1: Response = await promise1;
    const response2: Response = await promise2;

    expect(response1).toStrictEqual({ result: 13 });
    expect(response2).toStrictEqual({ result: 5 });

確認。

$ npm test

実行時のログはこちら。

> rpc@1.0.0 test
> jest

  console.log
    [2023-12-02T15:23:32.980Z]  [server1] [x] Start Server, Awaiting RPC requests

      at log (src/log.ts:24:11)

  console.log
    [2023-12-02T15:23:33.035Z]  [server2] [x] Start Server, Awaiting RPC requests

      at log (src/log.ts:24:11)

  console.log
    [2023-12-02T15:23:33.058Z] send message, correlationId = 69a5198e-10c4-4e47-86ed-16087f36a4e6

      at log (src/log.ts:24:11)

  console.log
    [2023-12-02T15:23:33.083Z] send message, correlationId = ff2aedcf-ca35-45c6-abaf-715c8b434198

      at log (src/log.ts:24:11)

  console.log
    [2023-12-02T15:23:33.090Z]  [server1] sleep, 3 sec...

      at log (src/log.ts:24:11)

  console.log
    [2023-12-02T15:23:33.125Z]  [server2] sleep, 3 sec...

      at log (src/log.ts:24:11)

 PASS  test/rpc.test.ts
  ✓ rpc test (3396 ms)

Test Suites: 1 passed, 1 total
Tests:       1 passed, 1 total
Snapshots:   0 total
Time:        3.725 s, estimated 6 s
Ran all test suites.

サーバーはリクエストを受け付けてから3秒スリープするのですが、全体の実行時間は4秒弱なので、処理自体は複数のサーバーインスタンス
行えています。

それはログからもわかりますね。

  console.log
    [2023-12-02T15:23:33.090Z]  [server1] sleep, 3 sec...

      at log (src/log.ts:24:11)

  console.log
    [2023-12-02T15:23:33.125Z]  [server2] sleep, 3 sec...

      at log (src/log.ts:24:11)

メッセージごとに異なるサーバーインスタンスが受信していることも確認できました。

OKですね。

おわりに

RabbitMQのチュートリアルの「RPC」を試してみました。

今までと少し毛色が違うのでなかなか興味深かったのと、少し考え方が広がる構成かなと思いますね。

今回で、Node.js(TypeScript/JavaScript)でRabbitMQのチュートリアルをなぞっていくのは最後にしたいと思います。
始めてから半年くらいかかっていますが、いったん完了(?)ですね。