CLOVER🍀

That was when it all began.

RabbitMQのJavaScriptチュートリアルの「Topics」をTypeScriptで試す

これは、なにをしたくて書いたもの?

RabbitMQのチュートリアルをJavaScriptクライアント+TypeScriptでやっていこう、ということで。

今回は「Topics」を扱います。こちらですね。

RabbitMQ tutorial - Topics — RabbitMQ

Topics

Fanout Exchangeでブロードキャストを、Direct Exchangeでルーティングが行えることを過去2つのチュートリアルで扱いました。

RabbitMQのJavaScriptチュートリアルの「Publish/Subscribe」をTypeScriptで試す - CLOVER🍀

RabbitMQのJavaScriptチュートリアルの「Routing」をTypeScriptで試す - CLOVER🍀

これらに対して、複数の条件でルーティングを行いたいという時に使うのがTopic Exchangeのようです。

AMQP 0-9-1 Model Explained / Exchanges and Exchange Types

RabbitMQ tutorial - Topics — RabbitMQ

topicは、ルーティングキーに.で区切られた単語を含める必要があるExchangeです。

AMQP 0-9-1 Model Explained / Exchanges and Exchange Types / Topic Exchange

例としてはstock.usd.nyse、nyse.vmw、quick.orange.rabbitといったもので、255バイト以内の長さであればOKです。

バインディングキーも同じ形式である必要があり、あるルーティングキーで送信されたメッセージは、一致するバインディングキーで
バインドされているすべてのキューに配信されます。

ルーティングキーとバインディングキーの差がわかりにくいですが、送信側で使うのがルーティングキー、受信側で使うのが
バインディングキーという位置づけのようです。

なお、これらのキーに対しては2種類のワイルドカードが定義されています。

  • * … ひとつの単語の代わりに使用できる
  • # … 0以上の単語を置き換えることができる

*.orange.*やlazy.#が例になります。

いずれのバインディングにも一致しなかったメッセージは破棄されるようです。複数のバインディングと一致する場合は、最後の
バインディングとなるキューに配信されるようです。

このあたりは、実際の動作を見た方がよさそうですね。

また、Topic Exchangeはバインディングキーの指定によっては、他のExchangeと同じように使えるようです。

バインディングキーを#のみとした場合は、Fanout Exchangeと同じ動作になるようです。ワイルドカードを含まない
バインディングキーとした場合は、Direct Exchangeと同じ動作になるようです。

では、このあたりの動作をNode.js+TypeScriptで試してみます。

環境

今回の環境は、こちら。

$ sudo -u rabbitmq rabbitmqctl version
3.12.8

RabbitMQは、172.17.0.2で動作しているものとします。

$ sudo -u rabbitmq rabbitmqctl add_user kazuhira password
$ sudo -u rabbitmq rabbitmqctl set_permissions -p / kazuhira '.*' '.*' '.*'
$ sudo -u rabbitmq rabbitmqctl set_user_tags kazuhira monitoring

使用するNode.jsのバージョン。

$ node --version
v18.18.2


$ npm --version
9.8.1

準備

まずはNode.jsプロジェクトを作成。

$ npm init -y
$ npm i -D typescript
$ npm i -D @types/node@v18
$ npm i -D prettier
$ npm i -D jest @types/jest
$ npm i -D esbuild esbuild-jest

確認は、テストコードで行います。

RabbitMQに接続するためのamqplibとその型定義をインストール。

$ npm i amqplib
$ npm i -D @types/amqplib

ソースコードを配置するディレクトリを作成。

$ mkdir src test

依存関係は、こうなりました。

  "devDependencies": {
    "@types/amqplib": "^0.10.4",
    "@types/jest": "^29.5.8",
    "@types/node": "^18.18.9",
    "esbuild": "^0.19.5",
    "esbuild-jest": "^0.5.0",
    "jest": "^29.7.0",
    "prettier": "^3.0.3",
    "typescript": "^5.2.2"
  },
  "dependencies": {
    "amqplib": "^0.10.3"
  }

scripts。

  "scripts": {
    "build": "tsc --project .",
    "build:watch": "tsc --project . --watch",
    "typecheck": "tsc --project ./tsconfig.typecheck.json",
    "typecheck:watch": "tsc --project ./tsconfig.typecheck.json --watch",
    "test": "jest",
    "format": "prettier --write src test"
  },

各種設定ファイル。

tsconfig.json

{
  "compilerOptions": {
    "target": "esnext",
    "module": "commonjs",
    "moduleResolution": "node",
    "lib": ["esnext"],
    "baseUrl": "./src",
    "outDir": "dist",
    "strict": true,
    "forceConsistentCasingInFileNames": true,
    "noFallthroughCasesInSwitch": true,
    "noImplicitOverride": true,
    "noImplicitReturns": true,
    "noPropertyAccessFromIndexSignature": true,
    "esModuleInterop": true
  },
  "include": [
    "src"
  ]
}

tsconfig.typecheck.json

{
  "extends": "./tsconfig",
  "compilerOptions": {
    "baseUrl": "./",
    "noEmit": true
  },
  "include": [
    "src", "test"
  ]
}

.prettierrc.json

{
  "singleQuote": true,
  "printWidth": 120
}

jest.config.js

module.exports = {
  testEnvironment: 'node',
  transform: {
    "^.+\\.tsx?$": "esbuild-jest"
  }
};

RabbitMQのJavaScriptチュートリアルの「Topics」をTypeScriptとJestで試す

では、こちらをTypeScriptとJestで試していきます。

RabbitMQ tutorial - Topics — RabbitMQ

チュートリアル内ではプロデューサーとコンシューマーをそれぞれ別プロセスで起動していますが、今回はテストコード内で表現して
みます。

プロデューサーを作成する

まずはプロデューサーから。クラスとして作成します。

src/Producer.ts

import amqp from 'amqplib';
import { log } from './log';

export class Producer {
  private conn: amqp.Connection;
  private channel: amqp.Channel;
  private exchange: string;

  constructor(conn: amqp.Connection, channel: amqp.Channel, exchange: string) {
    this.conn = conn;
    this.channel = channel;
    this.exchange = exchange;
  }

  static async create(url: string, exchange: string): Promise<Producer> {
    const conn = await amqp.connect(url);
    const channel = await conn.createChannel();

    await channel.assertExchange(exchange, 'topic', { durable: false });

    return new Producer(conn, channel, exchange);
  }

  send(routingKey: string, message: string): void {
    this.channel.publish(this.exchange, routingKey, Buffer.from(message));

    log(` [x] Send '${message}'`);
  }

  async close(): Promise<void> {
    await this.conn.close();
  }
}

Topic Exchangeを作成。

  static async create(url: string, exchange: string): Promise<Producer> {
    const conn = await amqp.connect(url);
    const channel = await conn.createChannel();

    await channel.assertExchange(exchange, 'topic', { durable: false });

    return new Producer(conn, channel, exchange);
  }

Channel-oriented API reference / API reference / Channel / assertExchange

メッセージをExchangeに送信する箇所は、こちら。

  send(routingKey: string, message: string): void {
    this.channel.publish(this.exchange, routingKey, Buffer.from(message));

    log(` [x] Send '${message}'`);
  }

Channel#publishの第2引数にルーティングキーを指定します。

Channel-oriented API reference / API reference / Channel / publish

このルーティングキーで、メッセージを受信するコンシューマーが決まります。ルーティングキーにマッチするコンシューマーは、
複数あってかまいません。

ちなみに、logというのはこういう定義です。

src/log.ts

export function log(message: string): void {
  console.log(`[${new Date().toISOString()}] ${message}`);
}

こちらはコンシューマーでも使います。

コンシューマーを作成する

次は、コンシューマーを作成します。

src/Consumer.ts

import amqp from 'amqplib';
import { log } from './log';

export class Consumer {
  private name: string;

  private conn: amqp.Connection;
  private channel: amqp.Channel;
  private exchange: string;
  private queue: string;
  private routingKeys: string[];

  private receivedMessages: string[] = [];

  constructor(
    name: string,
    conn: amqp.Connection,
    channel: amqp.Channel,
    exchange: string,
    queue: string,
    routingKeys: string[],
  ) {
    this.name = name;
    this.conn = conn;
    this.channel = channel;
    this.exchange = exchange;
    this.queue = queue;
    this.routingKeys = routingKeys;
  }

  static async create(name: string, url: string, exchange: string, routingKeys: string[]): Promise<Consumer> {
    const conn = await amqp.connect(url);
    const channel = await conn.createChannel();

    await channel.assertExchange(exchange, 'topic', { durable: false });
    const queue = await channel.assertQueue('', { exclusive: true });

    return new Consumer(name, conn, channel, exchange, queue.queue, routingKeys);
  }

  async start(): Promise<void> {
    for (const routingKey of this.routingKeys) {
      await this.channel.bindQueue(this.queue, this.exchange, routingKey);
    }

    await this.channel.consume(
      this.queue,
      async (message) => {
        const messageAsString = message!.content.toString();
        log(`  [${this.name} - ${message?.fields.routingKey}] Received, ${messageAsString}`);

        this.receivedMessages.push(messageAsString);
      },
      {
        noAck: true,
      },
    );

    log(` [${this.name} - [${this.routingKeys.join(', ')}]] waiting for message, for queue[${this.queue}]`);
  }

  getReceivedMessages(): string[] {
    return [...this.receivedMessages];
  }

  async close(): Promise<void> {
    await this.conn.close();
  }
}

Channel#assertExchangeまではプロデューサーと同じで、その後にChannel#assertQueueを呼び出します。

  static async create(name: string, url: string, exchange: string, routingKeys: string[]): Promise<Consumer> {
    const conn = await amqp.connect(url);
    const channel = await conn.createChannel();

    await channel.assertExchange(exchange, 'topic', { durable: false });
    const queue = await channel.assertQueue('', { exclusive: true });

    return new Consumer(name, conn, channel, exchange, queue.queue, routingKeys);
  }

Channel-oriented API reference / API reference / Channel / assertQueue

Routingの時もそうだったのですが、Channel#assertQueueでキューの名前を空文字列('')で指定しているところがポイントで、
こうするとamq.gen-JzTY20BRgKO-HjmUJj0wLgのようなランダムな名前でキューが作成されます。 またexclusiveをtrueに
しているので、これは排他的な一時キューとなり、接続がクローズされると削除されるものになります。

そしてキューにバインドします。

    for (const routingKey of this.routingKeys) {
      await this.channel.bindQueue(this.queue, this.exchange, routingKey);
    }

この時、第3引数にルーティングキーを渡すことで、プロデューサーが指定したルーティングキーと一致する場合にこのコンシューマーは
キューからメッセージを受信できます。
なお、今回はひとつのコンシューマーで複数のルーティングキーをバインドできるようにしています。

メッセージの受信。

    await this.channel.consume(
      this.queue,
      async (message) => {
        const messageAsString = message!.content.toString();
        log(`  [${this.name} - ${message?.fields.routingKey}] Received, ${messageAsString}`);

        this.receivedMessages.push(messageAsString);
      },
      {
        noAck: true,
      },
    );

どのルーティングキーによるメッセージなのかは、受信したメッセージの中身で判別できます。

noAckをtrueにしているので、メッセージを受信したらその時点でブローカーからメッセージを削除します。

これでプロデューサーとコンシューマーの準備は完了です。

テストコードで確認する

確認はテストコードで行います。

test/topics.test.ts

import { setTimeout } from 'timers/promises';
import { Consumer } from '../src/Consumer';
import { Producer } from '../src/Producer';

jest.setTimeout(10 * 1000);

test('topics test', async () => {
  const url = 'amqp://kazuhira:password@172.17.0.2:5672';
  const exchange = 'topic_logs';

  const producer = await Producer.create(url, exchange);

  // ルーティングキー info のメッセージを受け取る
  const infoLevelConsumer = await Consumer.create('info-level-consumer', url, exchange, ['info']);
  // 任意の1単語のルーティングキーのメッセージを受け取る
  const wildcardWordConsumer = await Consumer.create('wildcard-word-consumer', url, exchange, ['*']);
  // ルーティングキー web.[任意の1単語] のメッセージを受け取る
  const webCategoryConsumer = await Consumer.create('web-category-consumer', url, exchange, ['web.*']);
  // ルーティングキー cli.info のメッセージを受け取る
  const cliInfoConsumer = await Consumer.create('cli-info-consumer', url, exchange, ['cli.info']);
  // ルーティングキー [任意の0以上の単語].info のメッセージを受け取る
  const allCategoryInfoConsumer = await Consumer.create('all-category-info-consumer', url, exchange, ['#.info']);
  // ルーティングキー web.[任意の1単語] または cli.[任意の1単語] のメッセージを受け取る
  const webAndCliCategoryConsumer = await Consumer.create('web-and-cli-category-consumer', url, exchange, [
    'web.*',
    'cli.*',
  ]);

  try {
    await infoLevelConsumer.start();
    await wildcardWordConsumer.start();
    await webCategoryConsumer.start();
    await cliInfoConsumer.start();
    await allCategoryInfoConsumer.start();
    await webAndCliCategoryConsumer.start();

    // routing key, info
    producer.send('info', 'Info Message');

    await setTimeout(1 * 1000);

    expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']);
    expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message']);
    expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual([]);
    expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual([]);
    expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual(['Info Message']);
    expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual([]);

    // routing key, warn
    producer.send('warn', 'Warn Message');

    await setTimeout(1 * 1000);

    expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']);
    expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message', 'Warn Message']);
    expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual([]);
    expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual([]);
    expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual(['Info Message']);
    expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual([]);

    // routing key, web.info
    producer.send('web.info', '[Web] Info Message');

    await setTimeout(1 * 1000);

    expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']);
    expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message', 'Warn Message']);
    expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message']);
    expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual([]);
    expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual(['Info Message', '[Web] Info Message']);
    expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message']);

    // routing key, cli.warn
    producer.send('cli.warn', '[Cli] Warn Message');

    await setTimeout(1 * 1000);

    expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']);
    expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message', 'Warn Message']);
    expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message']);
    expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual([]);
    expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual(['Info Message', '[Web] Info Message']);
    expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message', '[Cli] Warn Message']);

    // routing key, cli.info
    producer.send('cli.info', '[Cli] Info Message');

    await setTimeout(1 * 1000);

    expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']);
    expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message', 'Warn Message']);
    expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message']);
    expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual(['[Cli] Info Message']);
    expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual([
      'Info Message',
      '[Web] Info Message',
      '[Cli] Info Message',
    ]);
    expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual([
      '[Web] Info Message',
      '[Cli] Warn Message',
      '[Cli] Info Message',
    ]);

    // routing key, category.sub-category.info
    producer.send('category.sub-category.info', '[Category / Sub Category] Info Message');

    await setTimeout(1 * 1000);

    expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']);
    expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message', 'Warn Message']);
    expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message']);
    expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual(['[Cli] Info Message']);
    expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual([
      'Info Message',
      '[Web] Info Message',
      '[Cli] Info Message',
      '[Category / Sub Category] Info Message',
    ]);
    expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual([
      '[Web] Info Message',
      '[Cli] Warn Message',
      '[Cli] Info Message',
    ]);
  } finally {
    await producer.close();
    await infoLevelConsumer.close();
    await wildcardWordConsumer.close();
    await webCategoryConsumer.close();
    await cliInfoConsumer.close();
    await allCategoryInfoConsumer.close();
    await webAndCliCategoryConsumer.close();
  }
});

第4引数の配列がルーティングキーです。お題はチュートリアルと似ているのですが、カテゴリごとのログレベルにしています。

  // ルーティングキー info のメッセージを受け取る
  const infoLevelConsumer = await Consumer.create('info-level-consumer', url, exchange, ['info']);
  // 任意の1単語のルーティングキーのメッセージを受け取る
  const wildcardWordConsumer = await Consumer.create('wildcard-word-consumer', url, exchange, ['*']);
  // ルーティングキー web.[任意の1単語] のメッセージを受け取る
  const webCategoryConsumer = await Consumer.create('web-category-consumer', url, exchange, ['web.*']);
  // ルーティングキー cli.info のメッセージを受け取る
  const cliInfoConsumer = await Consumer.create('cli-info-consumer', url, exchange, ['cli.info']);
  // ルーティングキー [任意の0以上の単語].info のメッセージを受け取る
  const allCategoryInfoConsumer = await Consumer.create('all-category-info-consumer', url, exchange, ['#.info']);
  // ルーティングキー web.[任意の1単語] または cli.[任意の1単語] のメッセージを受け取る
  const webAndCliCategoryConsumer = await Consumer.create('web-and-cli-category-consumer', url, exchange, [
    'web.*',
    'cli.*',
  ]);

コンシューマーが受け取るメッセージのルールは、コメントに書いています。

ここで、ワイルドカードには2種類あったことを思い出しましょう。

  • * … ひとつの単語の代わりに使用できる
  • # … 0以上の単語を置き換えることができる

あとはコンシューマーを開始して、ルーティングキーを指定してメッセージを送り、確認していきます。

ルーティングキーinfo。

    // routing key, info
    producer.send('info', 'Info Message');

    await setTimeout(1 * 1000);

    expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']);
    expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message']);
    expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual([]);
    expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual([]);
    expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual(['Info Message']);
    expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual([]);

ルーティングキーwarn。

    // routing key, warn
    producer.send('warn', 'Warn Message');

    await setTimeout(1 * 1000);

    expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']);
    expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message', 'Warn Message']);
    expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual([]);
    expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual([]);
    expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual(['Info Message']);
    expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual([]);

ルーティングキーweb.info。

    // routing key, web.info
    producer.send('web.info', '[Web] Info Message');

    await setTimeout(1 * 1000);

    expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']);
    expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message', 'Warn Message']);
    expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message']);
    expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual([]);
    expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual(['Info Message', '[Web] Info Message']);
    expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message']);

ルーティングキーcli.warn。

    // routing key, cli.warn
    producer.send('cli.warn', '[Cli] Warn Message');

    await setTimeout(1 * 1000);

    expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']);
    expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message', 'Warn Message']);
    expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message']);
    expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual([]);
    expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual(['Info Message', '[Web] Info Message']);
    expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message', '[Cli] Warn Message']);

ルーティングキーcli.info。

    // routing key, cli.info
    producer.send('cli.info', '[Cli] Info Message');

    await setTimeout(1 * 1000);

    expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']);
    expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message', 'Warn Message']);
    expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message']);
    expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual(['[Cli] Info Message']);
    expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual([
      'Info Message',
      '[Web] Info Message',
      '[Cli] Info Message',
    ]);
    expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual([
      '[Web] Info Message',
      '[Cli] Warn Message',
      '[Cli] Info Message',
    ]);

ルーティングキーcategory.sub-category.info。

    // routing key, category.sub-category.info
    producer.send('category.sub-category.info', '[Category / Sub Category] Info Message');

    await setTimeout(1 * 1000);

    expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']);
    expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message', 'Warn Message']);
    expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message']);
    expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual(['[Cli] Info Message']);
    expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual([
      'Info Message',
      '[Web] Info Message',
      '[Cli] Info Message',
      '[Category / Sub Category] Info Message',
    ]);
    expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual([
      '[Web] Info Message',
      '[Cli] Warn Message',
      '[Cli] Info Message',
    ]);

こんな感じで、メッセージ送信時に指定するルーティングキーと、受信側で受け取れるルーティングキーの指定でどのようにメッセージが
振り分けられていくのかを確認します。

テストを実行。

$ npm test

出力されるログは、こんな感じになります。

> topics@1.0.0 test
> jest

  console.log
    [2023-11-12T13:59:38.164Z]  [info-level-consumer - [info]] waiting for message, for queue[amq.gen-OJspjJWyM8gOVf3cALgU8A]

      at log (src/log.ts:24:11)

  console.log
    [2023-11-12T13:59:38.185Z]  [wildcard-word-consumer - [*]] waiting for message, for queue[amq.gen-tKBwn1jMIRsTnPyFOdBxug]

      at log (src/log.ts:24:11)

  console.log
    [2023-11-12T13:59:38.189Z]  [web-category-consumer - [web.*]] waiting for message, for queue[amq.gen-5dDYztuTqk0D9nO1lqmTUQ]

      at log (src/log.ts:24:11)

  console.log
    [2023-11-12T13:59:38.193Z]  [cli-info-consumer - [cli.info]] waiting for message, for queue[amq.gen-ahlQp-YTm6DWOOWi_wEFZA]

      at log (src/log.ts:24:11)

  console.log
    [2023-11-12T13:59:38.197Z]  [all-category-info-consumer - [#.info]] waiting for message, for queue[amq.gen-f6KPc3QLIVgNq0Qviej8HA]

      at log (src/log.ts:24:11)

  console.log
    [2023-11-12T13:59:38.201Z]  [web-and-cli-category-consumer - [web.*, cli.*]] waiting for message, for queue[amq.gen-DzAdFyAn0vc3-r4hHNwhWw]

      at log (src/log.ts:24:11)

  console.log
    [2023-11-12T13:59:38.204Z]  [x] Send 'Info Message'

      at log (src/log.ts:24:11)

  console.log
    [2023-11-12T13:59:38.208Z]   [wildcard-word-consumer - info] Received, Info Message

      at log (src/log.ts:24:11)

  console.log
    [2023-11-12T13:59:38.211Z]   [all-category-info-consumer - info] Received, Info Message

      at log (src/log.ts:24:11)

  console.log
    [2023-11-12T13:59:38.212Z]   [info-level-consumer - info] Received, Info Message

      at log (src/log.ts:24:11)

  console.log
    [2023-11-12T13:59:39.213Z]  [x] Send 'Warn Message'

      at log (src/log.ts:24:11)

  console.log
    [2023-11-12T13:59:39.217Z]   [wildcard-word-consumer - warn] Received, Warn Message

      at log (src/log.ts:24:11)

  console.log
    [2023-11-12T13:59:40.220Z]  [x] Send '[Web] Info Message'

      at log (src/log.ts:24:11)

  console.log
    [2023-11-12T13:59:40.225Z]   [web-and-cli-category-consumer - web.info] Received, [Web] Info Message

      at log (src/log.ts:24:11)

  console.log
    [2023-11-12T13:59:40.229Z]   [all-category-info-consumer - web.info] Received, [Web] Info Message

      at log (src/log.ts:24:11)

  console.log
    [2023-11-12T13:59:40.233Z]   [web-category-consumer - web.info] Received, [Web] Info Message

      at log (src/log.ts:24:11)

  console.log
    [2023-11-12T13:59:41.228Z]  [x] Send '[Cli] Warn Message'

      at log (src/log.ts:24:11)

  console.log
    [2023-11-12T13:59:41.232Z]   [web-and-cli-category-consumer - cli.warn] Received, [Cli] Warn Message

      at log (src/log.ts:24:11)

  console.log
    [2023-11-12T13:59:42.235Z]  [x] Send '[Cli] Info Message'

      at log (src/log.ts:24:11)

  console.log
    [2023-11-12T13:59:42.240Z]   [all-category-info-consumer - cli.info] Received, [Cli] Info Message

      at log (src/log.ts:24:11)

  console.log
    [2023-11-12T13:59:42.246Z]   [cli-info-consumer - cli.info] Received, [Cli] Info Message

      at log (src/log.ts:24:11)

  console.log
    [2023-11-12T13:59:42.251Z]   [web-and-cli-category-consumer - cli.info] Received, [Cli] Info Message

      at log (src/log.ts:24:11)

  console.log
    [2023-11-12T13:59:43.247Z]  [x] Send '[Category / Sub Category] Info Message'

      at log (src/log.ts:24:11)

  console.log
    [2023-11-12T13:59:43.251Z]   [all-category-info-consumer - category.sub-category.info] Received, [Category / Sub Category] Info Message

      at log (src/log.ts:24:11)

 PASS  test/topics.test.ts (6.847 s)
  ✓ topics test (6628 ms)

Test Suites: 1 passed, 1 total
Tests:       1 passed, 1 total
Snapshots:   0 total
Time:        6.9 s, estimated 7 s
Ran all test suites.

OKですね。

また、テスト実行中に以下のコマンドを実行してみます。

$ sudo -u rabbitmq rabbitmqctl list_exchanges
$ sudo -u rabbitmq rabbitmqctl list_queues
$ sudo -u rabbitmq rabbitmqctl list_bindings

結果。

$ sudo -u rabbitmq rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
name    type
amq.direct      direct
        direct
amq.fanout      fanout
amq.topic       topic
amq.match       headers
amq.rabbitmq.trace      topic
amq.headers     headers
topic_logs      topic


$ sudo -u rabbitmq rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
amq.gen-inw-ulcTsS2vXTcsV5oPfQ  0
amq.gen-AKmcQWVbO8w69uoGF_2IZQ  0
amq.gen-kuMaXrOg9yoMRtMeOWpu8g  0
amq.gen-QDLMnCjnkUvbvM36OywdBw  0
amq.gen-WnRhEe5b8ATpBrhFWXkn6A  0
amq.gen-HK7x22ChT_KEudcQwgvsAQ  0


$ sudo -u rabbitmq rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name     source_kind     destination_name        destination_kind        routing_key     arguments
        exchange        amq.gen-inw-ulcTsS2vXTcsV5oPfQ  queue   amq.gen-inw-ulcTsS2vXTcsV5oPfQ  []
        exchange        amq.gen-AKmcQWVbO8w69uoGF_2IZQ  queue   amq.gen-AKmcQWVbO8w69uoGF_2IZQ  []
        exchange        amq.gen-kuMaXrOg9yoMRtMeOWpu8g  queue   amq.gen-kuMaXrOg9yoMRtMeOWpu8g  []
        exchange        amq.gen-QDLMnCjnkUvbvM36OywdBw  queue   amq.gen-QDLMnCjnkUvbvM36OywdBw  []
        exchange        amq.gen-WnRhEe5b8ATpBrhFWXkn6A  queue   amq.gen-WnRhEe5b8ATpBrhFWXkn6A  []
        exchange        amq.gen-HK7x22ChT_KEudcQwgvsAQ  queue   amq.gen-HK7x22ChT_KEudcQwgvsAQ  []
topic_logs      exchange        amq.gen-QDLMnCjnkUvbvM36OywdBw  queue   #.info  []
topic_logs      exchange        amq.gen-inw-ulcTsS2vXTcsV5oPfQ  queue   *       []
topic_logs      exchange        amq.gen-HK7x22ChT_KEudcQwgvsAQ  queue   cli.*   []
topic_logs      exchange        amq.gen-AKmcQWVbO8w69uoGF_2IZQ  queue   cli.info        []
topic_logs      exchange        amq.gen-WnRhEe5b8ATpBrhFWXkn6A  queue   info    []
topic_logs      exchange        amq.gen-HK7x22ChT_KEudcQwgvsAQ  queue   web.*   []
topic_logs      exchange        amq.gen-kuMaXrOg9yoMRtMeOWpu8g  queue   web.*   []

Topic Exchangeがあり

topic_logs      topic

一時キューがあり

amq.gen-inw-ulcTsS2vXTcsV5oPfQ  0
amq.gen-AKmcQWVbO8w69uoGF_2IZQ  0
amq.gen-kuMaXrOg9yoMRtMeOWpu8g  0
amq.gen-QDLMnCjnkUvbvM36OywdBw  0
amq.gen-WnRhEe5b8ATpBrhFWXkn6A  0
amq.gen-HK7x22ChT_KEudcQwgvsAQ  0

Exchangeと一時キューの紐付けも確認できます。

source_name     source_kind     destination_name        destination_kind        routing_key     arguments
        exchange        amq.gen-inw-ulcTsS2vXTcsV5oPfQ  queue   amq.gen-inw-ulcTsS2vXTcsV5oPfQ  []
        exchange        amq.gen-AKmcQWVbO8w69uoGF_2IZQ  queue   amq.gen-AKmcQWVbO8w69uoGF_2IZQ  []
        exchange        amq.gen-kuMaXrOg9yoMRtMeOWpu8g  queue   amq.gen-kuMaXrOg9yoMRtMeOWpu8g  []
        exchange        amq.gen-QDLMnCjnkUvbvM36OywdBw  queue   amq.gen-QDLMnCjnkUvbvM36OywdBw  []
        exchange        amq.gen-WnRhEe5b8ATpBrhFWXkn6A  queue   amq.gen-WnRhEe5b8ATpBrhFWXkn6A  []
        exchange        amq.gen-HK7x22ChT_KEudcQwgvsAQ  queue   amq.gen-HK7x22ChT_KEudcQwgvsAQ  []
topic_logs      exchange        amq.gen-QDLMnCjnkUvbvM36OywdBw  queue   #.info  []
topic_logs      exchange        amq.gen-inw-ulcTsS2vXTcsV5oPfQ  queue   *       []
topic_logs      exchange        amq.gen-HK7x22ChT_KEudcQwgvsAQ  queue   cli.*   []
topic_logs      exchange        amq.gen-AKmcQWVbO8w69uoGF_2IZQ  queue   cli.info        []
topic_logs      exchange        amq.gen-WnRhEe5b8ATpBrhFWXkn6A  queue   info    []
topic_logs      exchange        amq.gen-HK7x22ChT_KEudcQwgvsAQ  queue   web.*   []
topic_logs      exchange        amq.gen-kuMaXrOg9yoMRtMeOWpu8g  queue   web.*   []

ルーティングキーもわかりますね。

こんなところでしょうか。

おわりに

RabbitMQのJavaScriptチュートリアルの「Topics」を、TypeScriptで試してみました。

前回からまた時間が空いたので、少し前のエントリーを見ながら思い出しつつやってみました。ワイルドカードの扱いをコロッと
忘れたりしていましたが、なんとか確認できる形にはなったかなと思います。

「Routing」と混ざりやすい気がしないでもないですが、いろんなことができますね…。