CLOVER🍀

That was when it all began.

RabbitMQのJavaScriptチュートリアルの「Routing」をTypeScriptで試す

これは、なにをしたくて書いたもの?

RabbitMQのチュートリアルをJavaScriptクライアント+TypeScriptでやっていこう、ということで。

今回は「Routing」を扱います。こちらですね。

RabbitMQ tutorial - Routing — RabbitMQ

Routing

「Routing」では、メッセージをあるルールに従って特定のコンシューマーに配信します。

RabbitMQ tutorial - Routing — RabbitMQ

「Publish/Subscribe」では、以下のようにExchangeを作った後にキューをバインディング(関連付け)しました。

channel.bindQueue(q.queue, exchange, '');

これは、「キューが特定のExchangeに関心がある」ということを指しています。

Channel-oriented API reference / API reference / Channel / bindQueue

バインディングを行う時に、追加のキーを指定できます。これをバインディングキーと呼びます。

channel.bindQueue(queue_name, exchange_name, 'black');

バインディングキーは、Exchangeの種類によって意味が変わります。たとえばFanout exchangeだと、この値を無視します。

Exchangeの種類は、こちらですね。

  • Direct exchange
  • Fanout exchange
  • Topic exchange
  • Headers exchange

AMQP 0-9-1 Model Explained / Exchanges and Exchange Types

Fanout exchangeは、すべてのメッセージをすべてのコンシューマーにブロードキャストするものでした。

Direct exchange

Direct exchangeを使った場合、ルーティングキーに従ってメッセージを配信するキューを振り分けることができます。

AMQP 0-9-1 Model Explained / Exchanges and Exchange Types / Direct Exchange

ひとつのExchangeに、複数のキューを紐付けることもできます。

また、同じルーティングキーで複数のキューをバインドすることもできます。

では、ここまでの内容をNode.js+TypeScriptで試してみます。

環境

今回の環境は、こちら。

$ sudo -u rabbitmq rabbitmqctl version
3.12.4

RabbitMQは、172.17.0.2で動作しているものとします。

ユーザーも作成しておきます。

$ sudo -u rabbitmq rabbitmqctl add_user kazuhira password
$ sudo -u rabbitmq rabbitmqctl set_permissions -p / kazuhira '.*' '.*' '.*'
$ sudo -u rabbitmq rabbitmqctl set_user_tags kazuhira monitoring

使用するNode.jsのバージョン。

$ node --version
v18.17.1


$ npm --version
9.6.7

準備

では、まずはNode.jsプロジェクトを作成します。

$ npm init -y
$ npm i -D typescript
$ npm i -D @types/node@v18
$ npm i -D prettier
$ npm i -D jest @types/jest
$ npm i -D esbuild esbuild-jest

確認は、テストコードで行います。

RabbitMQに接続するためのamqplibとその型定義をインストール。

$ npm i amqplib
$ npm i -D @types/amqplib

ソースコードを配置するディレクトリを作成。

$ mkdir src test

依存関係は、こうなりました。

  "devDependencies": {
    "@types/amqplib": "^0.10.1",
    "@types/jest": "^29.5.5",
    "@types/node": "^18.17.18",
    "esbuild": "^0.19.3",
    "esbuild-jest": "^0.5.0",
    "jest": "^29.7.0",
    "prettier": "^3.0.3",
    "typescript": "^5.2.2"
  },
  "dependencies": {
    "amqplib": "^0.10.3"
  }

scripts。

  "scripts": {
    "build": "tsc --project .",
    "build:watch": "tsc --project . --watch",
    "typecheck": "tsc --project ./tsconfig.typecheck.json",
    "typecheck:watch": "tsc --project ./tsconfig.typecheck.json --watch",
    "test": "jest",
    "format": "prettier --write src test"
  },

各種設定ファイル。

tsconfig.json

{
  "compilerOptions": {
    "target": "esnext",
    "module": "commonjs",
    "moduleResolution": "node",
    "lib": ["esnext"],
    "baseUrl": "./src",
    "outDir": "dist",
    "strict": true,
    "forceConsistentCasingInFileNames": true,
    "noFallthroughCasesInSwitch": true,
    "noImplicitOverride": true,
    "noImplicitReturns": true,
    "noPropertyAccessFromIndexSignature": true,
    "esModuleInterop": true
  },
  "include": [
    "src"
  ]
}

tsconfig.typecheck.json

{
  "extends": "./tsconfig",
  "compilerOptions": {
    "baseUrl": "./",
    "noEmit": true
  },
  "include": [
    "src", "test"
  ]
}

.prettierrc.json

{
  "singleQuote": true,
  "printWidth": 120
}

jest.config.js

module.exports = {
  testEnvironment: 'node',
  transform: {
    "^.+\\.tsx?$": "esbuild-jest"
  }
};

RabbitMQのJavaScriptチュートリアルの「Routing」をTypeScriptとJestで試す

では、こちらをTypeScriptとJestで試していきます。

RabbitMQ tutorial - Routing — RabbitMQ

チュートリアル内ではプロデューサーとコンシューマーをそれぞれ別プロセスで起動していますが、今回はテストコード内で表現します。

プロデューサーを作成する

まずはプロデューサーから。クラスとして作成します。

src/Producer.ts

import amqp from 'amqplib';
import { log } from './log';

export class Producer {
  private conn: amqp.Connection;
  private channel: amqp.Channel;
  private exchange: string;

  constructor(conn: amqp.Connection, channel: amqp.Channel, exchange: string) {
    this.conn = conn;
    this.channel = channel;
    this.exchange = exchange;
  }

  static async create(url: string, exchange: string): Promise<Producer> {
    const conn = await amqp.connect(url);
    const channel = await conn.createChannel();

    await channel.assertExchange(exchange, 'direct', {
      durable: false,
    });

    return new Producer(conn, channel, exchange);
  }

  send(routingKey: string, message: string): void {
    this.channel.publish(this.exchange, routingKey, Buffer.from(message));

    log(` [x] Sent ${routingKey}: '${message}'`);
  }

  async close(): Promise<void> {
    await this.conn.close();
  }
}

Direct Exchangeを作成。

    await channel.assertExchange(exchange, 'direct', {
      durable: false,
    });

Channel-oriented API reference / API reference / Channel / assertExchange

メッセージをExchangeに送信する箇所は、こちら。

  send(routingKey: string, message: string): void {
    this.channel.publish(this.exchange, routingKey, Buffer.from(message));

    log(` [x] Sent ${routingKey}: '${message}'`);
  }

Channel#publishの第2引数にルーティングキーを指定します。

Channel-oriented API reference / API reference / Channel / publish

ブロードキャストしている時は、このルーティングキーに空の文字列を指定していました。

ちなみに、logというのはこういう定義です。

src/log.ts

export function log(message: string): void {
  console.log(`[${new Date().toISOString()}] ${message}`);
}

こちらはコンシューマーでも使います。

コンシューマーを作成する

次は、コンシューマーを作成します。

src/Consumer.ts

import amqp from 'amqplib';
import { log } from './log';

export class Consumer {
  private name: string;

  private conn: amqp.Connection;
  private channel: amqp.Channel;
  private exchange: string;
  private queue: string;
  private routingKeys: string[];

  private receivedMessages: string[] = [];

  constructor(
    name: string,
    conn: amqp.Connection,
    channel: amqp.Channel,
    exchange: string,
    queue: string,
    routingKeys: string[],
  ) {
    this.name = name;
    this.conn = conn;
    this.channel = channel;
    this.exchange = exchange;
    this.queue = queue;
    this.routingKeys = routingKeys;
  }

  static async create(name: string, url: string, exchange: string, routingKeys: string[]): Promise<Consumer> {
    const conn = await amqp.connect(url);
    const channel = await conn.createChannel();

    await channel.assertExchange(exchange, 'direct', { durable: false });
    const queue = await channel.assertQueue('', { exclusive: true });

    return new Consumer(name, conn, channel, exchange, queue.queue, routingKeys);
  }

  async start(): Promise<void> {
    for (const routingKey of this.routingKeys) {
      await this.channel.bindQueue(this.queue, this.exchange, routingKey);
    }

    await this.channel.consume(this.queue, async (message) => {
      const messageAsString = message!.content.toString();
      log(`  [${this.name} - ${message?.fields.routingKey}] Received, ${messageAsString}`);

      this.receivedMessages.push(messageAsString);
    }, {
      noAck: true
    });

    log(` [${this.name} - [${this.routingKeys.join(', ')}]] waiting for message, for queue[${this.queue}]`);
  }

  getReceivedMessages(): string[] {
    return [...this.receivedMessages];
  }

  async close(): Promise<void> {
    await this.conn.close();
  }
}

Channel#assertExchangeまではプロデューサーと同じで、その後にChannel#assertQueueを呼び出します。

    await channel.assertExchange(exchange, 'direct', { durable: false });
    const queue = await channel.assertQueue('', { exclusive: true });

Channel-oriented API reference / API reference / Channel / assertQueue

ここで、キューの名前を空文字列('')で指定しているところがポイントで、こうするとamq.gen-JzTY20BRgKO-HjmUJj0wLgのような
ランダムな名前でキューが作成されます。 またexclusiveをtrueにしているので、これは排他的な一時キューとなり、接続がクローズされると削除されるものになります。

そして、キューにバインドします。

    for (const routingKey of this.routingKeys) {
      await this.channel.bindQueue(this.queue, this.exchange, routingKey);
    }

この時、第3引数にルーティングキーを渡すことで、プロデューサーが指定したルーティングキーと一致する場合にこのコンシューマーは
キューからメッセージを受信できます。
なお、今回はひとつのコンシューマーで複数のルーティングキーをバインドできるようにしています。

メッセージの受信。

    await this.channel.consume(this.queue, async (message) => {
      const messageAsString = message!.content.toString();
      log(`  [${this.name} - ${message?.fields.routingKey}] Received, ${messageAsString}`);

      this.receivedMessages.push(messageAsString);
    }, {
      noAck: true
    });

どのルーティングキーによるメッセージなのかは、受信したメッセージの中身でわかるようです。

noAckをtrueにしているので、メッセージを受信したらその時点でブローカーからメッセージを削除します。

これでプロデューサーとコンシューマーの準備は完了です。

テストコードで確認する

確認はテストコードで行います。

test/routing.test.ts

import { setTimeout } from 'node:timers/promises';
import { Consumer } from '../src/Consumer';
import { Producer } from '../src/Producer';

test('routing test', async () => {
  const url = 'amqp://kazuhira:password@172.17.0.2:5672';
  const exchange = 'direct_logs';

  const producer = await Producer.create(url, exchange);

  const infoLevelConsumer = await Consumer.create('info-level-consumer', url, exchange, ['info']);
  const warnLevelConsumer = await Consumer.create('warn-level-consumer', url, exchange, ['warn']);
  const errorLevelConsumer = await Consumer.create('error-level-consumer', url, exchange, ['error']);
  const allLevelConsumer = await Consumer.create('all-level-consumer', url, exchange, ['info', 'warn', 'error']);

  try {
    await infoLevelConsumer.start();
    await warnLevelConsumer.start();
    await errorLevelConsumer.start();
    await allLevelConsumer.start();

    // routing key, info
    producer.send('info', 'Blue');

    await setTimeout(1 * 1000);

    expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Blue']);
    expect(warnLevelConsumer.getReceivedMessages()).toStrictEqual([]);
    expect(errorLevelConsumer.getReceivedMessages()).toStrictEqual([]);
    expect(allLevelConsumer.getReceivedMessages()).toStrictEqual(['Blue']);

    // routing key, warn
    producer.send('warn', 'Green');

    await setTimeout(1 * 1000);

    expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Blue']);
    expect(warnLevelConsumer.getReceivedMessages()).toStrictEqual(['Green']);
    expect(errorLevelConsumer.getReceivedMessages()).toStrictEqual([]);
    expect(allLevelConsumer.getReceivedMessages()).toStrictEqual(['Blue', 'Green']);

    // routing key, error
    producer.send('error', 'Red');

    await setTimeout(1 * 1000);

    expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Blue']);
    expect(warnLevelConsumer.getReceivedMessages()).toStrictEqual(['Green']);
    expect(errorLevelConsumer.getReceivedMessages()).toStrictEqual(['Red']);
    expect(allLevelConsumer.getReceivedMessages()).toStrictEqual(['Blue', 'Green', 'Red']);
  } finally {
    await producer.close();
    await infoLevelConsumer.close();
    await warnLevelConsumer.close();
    await errorLevelConsumer.close();
    await allLevelConsumer.close();
  }
});

第4引数の配列がルーティングキーです。お題はチュートリアルと同じようにログレベルにして、最後のコンシューマーだけすべての
メッセージを受け取るように複数のルーティングキーを指定しています。

  const infoLevelConsumer = await Consumer.create('info-level-consumer', url, exchange, ['info']);
  const warnLevelConsumer = await Consumer.create('warn-level-consumer', url, exchange, ['warn']);
  const errorLevelConsumer = await Consumer.create('error-level-consumer', url, exchange, ['error']);
  const allLevelConsumer = await Consumer.create('all-level-consumer', url, exchange, ['info', 'warn', 'error']);

あとはコンシューマーを開始して、ルーティングキーを指定してメッセージを送り、確認します。

    await infoLevelConsumer.start();
    await warnLevelConsumer.start();
    await errorLevelConsumer.start();
    await allLevelConsumer.start();

    // routing key, info
    producer.send('info', 'Blue');

    await setTimeout(1 * 1000);

    expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Blue']);
    expect(warnLevelConsumer.getReceivedMessages()).toStrictEqual([]);
    expect(errorLevelConsumer.getReceivedMessages()).toStrictEqual([]);
    expect(allLevelConsumer.getReceivedMessages()).toStrictEqual(['Blue']);

最後がこちらですが、各レベルごとにメッセージがコンシューマーに振り分けられ、最後のコンシューマーだけがすべてのメッセージを
受け取ることを確認します。

    // routing key, error
    producer.send('error', 'Red');

    await setTimeout(1 * 1000);

    expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Blue']);
    expect(warnLevelConsumer.getReceivedMessages()).toStrictEqual(['Green']);
    expect(errorLevelConsumer.getReceivedMessages()).toStrictEqual(['Red']);
    expect(allLevelConsumer.getReceivedMessages()).toStrictEqual(['Blue', 'Green', 'Red']);

テストを実行。

$ npm run test

出力されるログは、こんな感じになります。

> routing@1.0.0 test
> jest

  console.log
    [2023-09-21T14:28:48.467Z]  [info-level-consumer - [info]] waiting for message, for queue[amq.gen-PQK-y4D3rWGBwpC0i8E1uA]

      at log (src/log.ts:24:11)

  console.log
    [2023-09-21T14:28:48.523Z]  [warn-level-consumer - [warn]] waiting for message, for queue[amq.gen-qukTzE39hc3wInQ9kGBNpw]

      at log (src/log.ts:24:11)

  console.log
    [2023-09-21T14:28:48.529Z]  [error-level-consumer - [error]] waiting for message, for queue[amq.gen-led1Rj7VGecWDwzt8_cz1g]

      at log (src/log.ts:24:11)

  console.log
    [2023-09-21T14:28:48.538Z]  [all-level-consumer - [info, warn, error]] waiting for message, for queue[amq.gen-H6nNi2vUJ25tgZu3WF4eew]

      at log (src/log.ts:24:11)

  console.log
    [2023-09-21T14:28:48.544Z]  [x] Sent info: 'Blue'

      at log (src/log.ts:24:11)

  console.log
    [2023-09-21T14:28:48.551Z]   [info-level-consumer - info] Received, Blue

      at log (src/log.ts:24:11)

  console.log
    [2023-09-21T14:28:48.556Z]   [all-level-consumer - info] Received, Blue

      at log (src/log.ts:24:11)

  console.log
    [2023-09-21T14:28:49.558Z]  [x] Sent warn: 'Green'

      at log (src/log.ts:24:11)

  console.log
    [2023-09-21T14:28:49.563Z]   [warn-level-consumer - warn] Received, Green

      at log (src/log.ts:24:11)

  console.log
    [2023-09-21T14:28:49.568Z]   [all-level-consumer - warn] Received, Green

      at log (src/log.ts:24:11)

  console.log
    [2023-09-21T14:28:50.565Z]  [x] Sent error: 'Red'

      at log (src/log.ts:24:11)

  console.log
    [2023-09-21T14:28:50.569Z]   [error-level-consumer - error] Received, Red

      at log (src/log.ts:24:11)

  console.log
    [2023-09-21T14:28:50.573Z]   [all-level-consumer - error] Received, Red

      at log (src/log.ts:24:11)

 PASS  test/routing.test.ts
  ✓ routing test (3489 ms)

Test Suites: 1 passed, 1 total
Tests:       1 passed, 1 total
Snapshots:   0 total
Time:        3.73 s, estimated 4 s
Ran all test suites.

OKですね。

また、テスト実行中に以下のコマンドを実行してみます。

$ sudo -u rabbitmq rabbitmqctl list_exchanges
$ sudo -u rabbitmq rabbitmqctl list_queues
$ sudo -u rabbitmq rabbitmqctl list_bindings

結果。

$ sudo -u rabbitmq rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
name    type
amq.rabbitmq.trace      topic
        direct
amq.direct      direct
amq.match       headers
direct_logs     direct
amq.topic       topic
amq.headers     headers
amq.fanout      fanout


$ sudo -u rabbitmq rabbitmqctl list_queues
Listing queues for vhost / ...
name    messages
amq.gen-SeeoxoV2bpfFOWpimilItg  0
amq.gen-qW51Nipw2X9KE5IUCfVKGw  0
amq.gen-Y3RxBoKwYSyFxk3O_b9AFg  0
amq.gen-Nitvmj1v32HsLLQ0jRm6JQ  0


$ sudo -u rabbitmq rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name     source_kind     destination_name        destination_kind        routing_key     arguments
        exchange        amq.gen-SeeoxoV2bpfFOWpimilItg  queue   amq.gen-SeeoxoV2bpfFOWpimilItg  []
        exchange        amq.gen-qW51Nipw2X9KE5IUCfVKGw  queue   amq.gen-qW51Nipw2X9KE5IUCfVKGw  []
        exchange        amq.gen-Y3RxBoKwYSyFxk3O_b9AFg  queue   amq.gen-Y3RxBoKwYSyFxk3O_b9AFg  []
        exchange        amq.gen-Nitvmj1v32HsLLQ0jRm6JQ  queue   amq.gen-Nitvmj1v32HsLLQ0jRm6JQ  []
direct_logs     exchange        amq.gen-SeeoxoV2bpfFOWpimilItg  queue   error   []
direct_logs     exchange        amq.gen-Y3RxBoKwYSyFxk3O_b9AFg  queue   error   []
direct_logs     exchange        amq.gen-Nitvmj1v32HsLLQ0jRm6JQ  queue   info    []
direct_logs     exchange        amq.gen-Y3RxBoKwYSyFxk3O_b9AFg  queue   info    []
direct_logs     exchange        amq.gen-Y3RxBoKwYSyFxk3O_b9AFg  queue   warn    []
direct_logs     exchange        amq.gen-qW51Nipw2X9KE5IUCfVKGw  queue   warn    []

Direct Exchangeがあり

direct_logs     direct

一時キューがあり

amq.gen-SeeoxoV2bpfFOWpimilItg  0
amq.gen-qW51Nipw2X9KE5IUCfVKGw  0
amq.gen-Y3RxBoKwYSyFxk3O_b9AFg  0
amq.gen-Nitvmj1v32HsLLQ0jRm6JQ  0

Exchangeと一時キューの紐付けも確認できます。

source_name     source_kind     destination_name        destination_kind        routing_key     arguments
        exchange        amq.gen-SeeoxoV2bpfFOWpimilItg  queue   amq.gen-SeeoxoV2bpfFOWpimilItg  []
        exchange        amq.gen-qW51Nipw2X9KE5IUCfVKGw  queue   amq.gen-qW51Nipw2X9KE5IUCfVKGw  []
        exchange        amq.gen-Y3RxBoKwYSyFxk3O_b9AFg  queue   amq.gen-Y3RxBoKwYSyFxk3O_b9AFg  []
        exchange        amq.gen-Nitvmj1v32HsLLQ0jRm6JQ  queue   amq.gen-Nitvmj1v32HsLLQ0jRm6JQ  []
direct_logs     exchange        amq.gen-SeeoxoV2bpfFOWpimilItg  queue   error   []
direct_logs     exchange        amq.gen-Y3RxBoKwYSyFxk3O_b9AFg  queue   error   []
direct_logs     exchange        amq.gen-Nitvmj1v32HsLLQ0jRm6JQ  queue   info    []
direct_logs     exchange        amq.gen-Y3RxBoKwYSyFxk3O_b9AFg  queue   info    []
direct_logs     exchange        amq.gen-Y3RxBoKwYSyFxk3O_b9AFg  queue   warn    []
direct_logs     exchange        amq.gen-qW51Nipw2X9KE5IUCfVKGw  queue   warn    []

ルーティングキーもわかりますね。

こんなところでしょうか。

おわりに

RabbitMQのJavaScriptチュートリアルの「Routing」を、TypeScriptで試してみました。

けっこう久しぶりにやったのでだいぶ忘れていたのですが、前に書いたエントリーなどを見ながらいろいろ思い出せたので、割とあっさり
確認できました。

とりあえず、チュートリアルはひととおり通したいなと思います。