CLOVER🍀

That was when it all began.

RabbitMQのJavaScriptチュートリアルの「Publish/Subscribe」をTypeScriptで試す

これは、なにをしたくて書いたもの?

RabbitMQのチュートリアルJavaScriptクライアント+TypeScriptでやっていこう、ということで。

今回は「Publish/Subscribe」を扱います。こちらですね。

RabbitMQ tutorial - Publish/Subscribe — RabbitMQ

Publish/Subscribe

「Publish/Subscribe」では、複数のコンシューマーにメッセージを配信します。

RabbitMQ tutorial - Publish/Subscribe — RabbitMQ

これは、Publish/Subscribeというパターンとして知られているものです。

このチュートリアルでは、単純なログシステムを構築します。2つの役割から構成され、ひとつはログメッセージの送信する
プロデューサー、もうひとつはメッセージを受信するコンシューマーです。 つまり、メッセージがコンシューマーにブロードキャスト的に送信されるわけです。

ここでのポイントは、プロデューサーは直接キューにメッセージを送信するのではなく、Exchangeを1度挟むということです。
Exchangeを作成する時に、タイプを指定することができます。

ch.assertExchange('logs', 'fanout', {durable: false})

Exchangeには、directtopicheadersfanoutの4つのタイプから選択できます。

AMQP 0-9-1 Model Explained / Exchanges and Exchange Types

fanoutは、Exchangeにバインドされているすべてのキューにメッセージのコピーを配信するタイプです。

AMQP 0-9-1 Model Explained / Exchanges and Exchange Types / Fanout Exchange

これでメッセージのブロードキャストが行われることになります。

メッセージの送信時には、Channel#publishの第1引数にExchangeの名前、第2引数にキューの名前を指定しますが、キューの名前は
空にしておきます。

channel.publish( 'logs' , '' , Buffer.from( 'Hello World!' ));

これは、特定のキューにメッセージを送信するわけではなく、Exchangeに送信したいことだけを宣言するものです。

ではキューはどうするかというと、コンシューマーに一時的なキューを作成してもらいます。

一時的なキューを作成するには、Channel#assertQueueのキュー名の部分を空文字列にします。

channel.assertQueue('', {
  exclusive: true
});

これで、amq.gen-JzTY20BRgKO-HjmUJj0wLgのようなランダムな名前を持ったキューが作成されます。また、exclusivetrue
することで排他的なキューを表し、コンシューマーが接続を切断すると一時キューは削除されます。

一時キューを作成する他の方法としては、TTLや自動削除の機能を使うものもあるようです。

ここまででファンアウトExchangeと一時キューができたので、この2つを以下で関連付けます。

channel.bindQueue(queue_name, 'logs', '');

これをバインディングと呼ぶようです。

では、ここまでの内容をNode.js+TypeScriptで試してみます。

環境

今回の環境は、こちら。

$ sudo -u rabbitmq rabbitmqctl version
3.11.12

RabbitMQは、172.17.0.2で動作しているものとします。

ユーザーも作成しておきます。

$ sudo -u rabbitmq rabbitmqctl add_user kazuhira password
$ sudo -u rabbitmq rabbitmqctl set_permissions -p / kazuhira '.*' '.*' '.*'
$ sudo -u rabbitmq rabbitmqctl set_user_tags kazuhira monitoring

使用するNode.jsのバージョン。

$ node --version
v18.15.0


$ npm --version
9.5.0

準備

では、まずはNode.jsプロジェクトを作成します。

$ npm init -y
$ npm i -D typescript
$ npm i -D @types/node@v18
$ npm i -D prettier
$ npm i -D jest @types/jest
$ npm i -D esbuild esbuild-jest

確認は、テストコードで行います。

RabbitMQに接続するためのamqplibとその型定義をインストール。

$ npm i amqplib
$ npm i -D @types/amqplib

ソースコードを配置するディレクトリを作成。

$ mkdir src test

依存関係は、こうなりました。

  "devDependencies": {
    "@types/amqplib": "^0.10.1",
    "@types/jest": "^29.5.0",
    "@types/node": "^18.15.11",
    "esbuild": "^0.17.15",
    "esbuild-jest": "^0.5.0",
    "jest": "^29.5.0",
    "prettier": "^2.8.7",
    "typescript": "^5.0.3"
  },
  "dependencies": {
    "amqplib": "^0.10.3"
  }

scripts

  "scripts": {
    "build": "tsc --project .",
    "build:watch": "tsc --project . --watch",
    "typecheck": "tsc --project ./tsconfig.typecheck.json",
    "typecheck:watch": "tsc --project ./tsconfig.typecheck.json --watch",
    "test": "jest",
    "format": "prettier --write src test"
  },

各種設定ファイル。

tsconfig.json

{
  "compilerOptions": {
    "target": "esnext",
    "module": "commonjs",
    "moduleResolution": "node",
    "lib": ["esnext"],
    "baseUrl": "./src",
    "outDir": "dist",
    "strict": true,
    "forceConsistentCasingInFileNames": true,
    "noFallthroughCasesInSwitch": true,
    "noImplicitOverride": true,
    "noImplicitReturns": true,
    "noPropertyAccessFromIndexSignature": true,
    "esModuleInterop": true
  },
  "include": [
    "src"
  ]
}

tsconfig.typecheck.json

{
  "extends": "./tsconfig",
  "compilerOptions": {
    "baseUrl": "./",
    "noEmit": true
  },
  "include": [
    "src", "test"
  ]
}

.prettierrc.json

{
  "singleQuote": true,
  "printWidth": 120
}

jest.config.js

module.exports = {
  testEnvironment: 'node',
  transform: {
    "^.+\\.tsx?$": "esbuild-jest"
  }
};

RabbitMQのJavaScriptチュートリアルの「Publish/Subscribe」をTypeScriptとJestで試す

では、こちらをTypeScriptとJestで試していきます。

RabbitMQ tutorial - Publish/Subscribe — RabbitMQ

チュートリアル内ではプロデューサーとコンシューマーをそれぞれ別プロセスで起動していますが、今回はテストコード内で表現して
みます。

プロデューサーを作成する

まずはプロデューサーを作成します。今回はプロデューサーもコンシューマーもクラスとして作成します。

src/Producer.ts

import amqp from 'amqplib';
import { log } from './log';

export class Producer {
  private conn: amqp.Connection;
  private channel: amqp.Channel;
  private exchange: string;

  constructor(conn: amqp.Connection, channel: amqp.Channel, exchange: string) {
    this.conn = conn;
    this.channel = channel;
    this.exchange = exchange;
  }

  static async create(url: string, exchange: string): Promise<Producer> {
    const conn = await amqp.connect(url);
    const channel = await conn.createChannel();

    await channel.assertExchange(exchange, 'fanout', { durable: false });

    return new Producer(conn, channel, exchange);
  }

  send(message: string): void {
    this.channel.publish(this.exchange, '', Buffer.from(message));

    log(` [x] Send '${message}'`);
  }

  async close(): Promise<void> {
    await this.conn.close();
  }
}

プロデューサーのポイントは、ここですね。Channel#assertExchangeでExchangeの存在確認を行い、存在していなければ作成します。

    await channel.assertExchange(exchange, 'fanout', { durable: false });

Channel-oriented API reference / API reference / Channel / assertExchange

第1引数はExchangeの名前です。第2引数はExchangeのタイプで、ブロードキャストとなるfanoutを指定します。
また、durablefalseにしているので、このExchangeはRabbitMQブローカーが再起動すると失われます。

メッセージをExchangeに送信しているのは、こちら。Channel#publishを使います。

    this.channel.publish(this.exchange, '', Buffer.from(message));

Channel-oriented API reference / API reference / Channel / publish

第1引数はExchangeの名前です。第2引数はルーティングキーで、キューへのルーティングを決めるものなのですが、ここで
空の文字列('')を指定することでルーティング先のキューを指定しないことになります。

ところで、logという関数は以下の定義です。

src/log.ts

export function log(message: string): void {
  console.log(`[${new Date().toISOString()}] ${message}`);
}

こちらはコンシューマーでも使います。

コンシューマーを作成する

次は、コンシューマーを作成します。

src/Consumer.ts

import amqp from 'amqplib';
import { log } from './log';

export class Consumer {
  private name: string;

  private conn: amqp.Connection;
  private channel: amqp.Channel;
  private exchange: string;
  private queue: string;

  private receivedMessages: string[] = [];

  constructor(name: string, conn: amqp.Connection, channel: amqp.Channel, exchange: string, queue: string) {
    this.name = name;
    this.conn = conn;
    this.channel = channel;
    this.exchange = exchange;
    this.queue = queue;
  }

  static async create(name: string, url: string, exchange: string): Promise<Consumer> {
    const conn = await amqp.connect(url);
    const channel = await conn.createChannel();

    await channel.assertExchange(exchange, 'fanout', { durable: false });
    const queue = await channel.assertQueue('', { exclusive: true });

    return new Consumer(name, conn, channel, exchange, queue.queue);
  }

  async start(): Promise<void> {
    await this.channel.bindQueue(this.queue, this.exchange, '');

    await this.channel.consume(
      this.queue,
      async (message) => {
        const messageAsString = message!.content.toString();
        log(` [${this.name}] Received, ${messageAsString}`);

        this.receivedMessages.push(messageAsString);
      },
      {
        noAck: true,
      }
    );

    log(`waiting for message, for queue, ${this.queue}`);
  }

  getReceivedMessage(): string[] {
    return [...this.receivedMessages];
  }

  async close(): Promise<void> {
    await this.conn.close();
  }
}

プロデューサーと同様にChannel#assertExchangeとした後に、Channel#assertQueueを呼び出します。

    await channel.assertExchange(exchange, 'fanout', { durable: false });
    const queue = await channel.assertQueue('', { exclusive: true });

Channel-oriented API reference / API reference / Channel / assertQueue

ここで、キューの名前を空文字列('')で指定しているところがポイントで、こうするとamq.gen-JzTY20BRgKO-HjmUJj0wLgのような
ランダムな名前でキューが作成されます。
またexclusivetrueにしているので、これは排他的な一時キューとなり、接続がクローズされると削除されるものになります。

そして、この一時キューとExchangeを紐付けます。それがChannel#bindQueueです。

    await this.channel.bindQueue(this.queue, this.exchange, '');

Channel-oriented API reference / API reference / Channel / bindQueue

この時、一時キューの名前とExchangeの名前を指定します。最後の引数はパターンですが、ここは空文字列を指定します。

あとはメッセージの受信です。

    await this.channel.consume(
      this.queue,
      async (message) => {
        const messageAsString = message!.content.toString();
        log(` [${this.name}] Received, ${messageAsString}`);

        this.receivedMessages.push(messageAsString);
      },
      {
        noAck: true,
      }
    );

今回はnoAcktrueにして、メッセージを受信したらその時点でメッセージをブローカーから削除する動作にしています。

これで、プロデューサーとコンシューマーができました。

テストコードで確認する

では、テストコードで確認してみます。

test/publish-subscribe.test.ts

import { setTimeout } from 'timers/promises';
import { Consumer } from '../src/Consumer';
import { Producer } from '../src/Producer';

test('publish subscribe test', async () => {
  const url = 'amqp://kazuhira:password@172.17.0.2:5672';
  const exchange = 'logs';

  const producer = await Producer.create(url, exchange);

  const consumer1 = await Consumer.create('consumer1', url, exchange);
  const consumer2 = await Consumer.create('consumer2', url, exchange);

  try {
    await consumer1.start();
    await consumer2.start();

    producer.send('Hello World');

    await setTimeout(2 * 1000);

    expect(consumer1.getReceivedMessage()).toEqual(['Hello World']);
    expect(consumer2.getReceivedMessage()).toEqual(['Hello World']);

    producer.send('Hello RabbitMQ');

    await setTimeout(2 * 1000);

    expect(consumer1.getReceivedMessage()).toEqual(['Hello World', 'Hello RabbitMQ']);
    expect(consumer2.getReceivedMessage()).toEqual(['Hello World', 'Hello RabbitMQ']);
  } finally {
    await producer.close();
    await consumer1.close();
    await consumer2.close();
  }
});

Exchangeの名前はlogsとします。

  const exchange = 'logs';

プロデューサーをひとつ、コンシューマーを2つ作成します。

  const producer = await Producer.create(url, exchange);

  const consumer1 = await Consumer.create('consumer1', url, exchange);
  const consumer2 = await Consumer.create('consumer2', url, exchange);

それぞれのコンシュマーをスタートしてから、プロデューサーからメッセージをExchangeに送信する度に、両方のコンシューマーで
メッセージを受信できることが確認できます。

    await consumer1.start();
    await consumer2.start();

    producer.send('Hello World');

    await setTimeout(2 * 1000);

    expect(consumer1.getReceivedMessage()).toEqual(['Hello World']);
    expect(consumer2.getReceivedMessage()).toEqual(['Hello World']);

    producer.send('Hello RabbitMQ');

    await setTimeout(2 * 1000);

    expect(consumer1.getReceivedMessage()).toEqual(['Hello World', 'Hello RabbitMQ']);
    expect(consumer2.getReceivedMessage()).toEqual(['Hello World', 'Hello RabbitMQ']);

実行。

$ npm test

コンソール上のログは、こんな感じになります。

> publish-subscribe@1.0.0 test
> jest

  console.log
    [2023-04-04T16:48:14.669Z] waiting for message, for queue, amq.gen-9ER3hYLMjeN56pYaUQq5SA

      at log (src/log.ts:24:11)

  console.log
    [2023-04-04T16:48:14.708Z] waiting for message, for queue, amq.gen-USWfLj95zKeTiOjBqJXHQQ

      at log (src/log.ts:24:11)

  console.log
    [2023-04-04T16:48:14.713Z]  [x] Send 'Hello World'

      at log (src/log.ts:24:11)

  console.log
    [2023-04-04T16:48:14.718Z]  [consumer2] Received, Hello World

      at log (src/log.ts:24:11)

  console.log
    [2023-04-04T16:48:14.731Z]  [consumer1] Received, Hello World

      at log (src/log.ts:24:11)

  console.log
    [2023-04-04T16:48:16.724Z]  [x] Send 'Hello RabbitMQ'

      at log (src/log.ts:24:11)

  console.log
    [2023-04-04T16:48:16.729Z]  [consumer2] Received, Hello RabbitMQ

      at log (src/log.ts:24:11)

  console.log
    [2023-04-04T16:48:16.734Z]  [consumer1] Received, Hello RabbitMQ

      at log (src/log.ts:24:11)

 PASS  test/publish-subscribe.test.ts (6.323 s)
  ✓ publish subscribe test (4370 ms)

Test Suites: 1 passed, 1 total
Tests:       1 passed, 1 total
Snapshots:   0 total
Time:        6.98 s
Ran all test suites.

OKですね。

また、テスト実行中に以下のコマンドを実行してみましょう。

$ sudo -u rabbitmq rabbitmqctl list_exchanges
$ sudo -u rabbitmq rabbitmqctl list_queues
$ sudo -u rabbitmq rabbitmqctl list_bindings

結果は、それぞれ以下です。

$ sudo -u rabbitmq rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
name    type
amq.match       headers
amq.rabbitmq.trace      topic
logs    fanout
        direct
amq.fanout      fanout
amq.direct      direct
amq.topic       topic
amq.headers     headers


$ sudo -u rabbitmq rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
amq.gen-T5dF5HyFmLcK3rwNG1O6Lg  0
amq.gen-BK-ZwFrhhJrragtXXE-fEg  0


$ sudo -u rabbitmq rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name     source_kind     destination_name        destination_kind        routing_key     arguments
        exchange        amq.gen-T5dF5HyFmLcK3rwNG1O6Lg  queue   amq.gen-T5dF5HyFmLcK3rwNG1O6Lg  []
        exchange        amq.gen-BK-ZwFrhhJrragtXXE-fEg  queue   amq.gen-BK-ZwFrhhJrragtXXE-fEg  []
logs    exchange        amq.gen-BK-ZwFrhhJrragtXXE-fEg  queue           []
logs    exchange        amq.gen-T5dF5HyFmLcK3rwNG1O6Lg  queue           []

Exchangeとしてlogsがあり、

logs    fanout
        direct

一時キューも確認できます。

amq.gen-T5dF5HyFmLcK3rwNG1O6Lg  0
amq.gen-BK-ZwFrhhJrragtXXE-fEg  0

Exchangeと一時キューの紐付けも確認できますね。

source_name     source_kind     destination_name        destination_kind        routing_key     arguments
        exchange        amq.gen-T5dF5HyFmLcK3rwNG1O6Lg  queue   amq.gen-T5dF5HyFmLcK3rwNG1O6Lg  []
        exchange        amq.gen-BK-ZwFrhhJrragtXXE-fEg  queue   amq.gen-BK-ZwFrhhJrragtXXE-fEg  []
logs    exchange        amq.gen-BK-ZwFrhhJrragtXXE-fEg  queue           []
logs    exchange        amq.gen-T5dF5HyFmLcK3rwNG1O6Lg  queue           []

これを見ると、空文字で指定したルーティングキーは、自動で生成される感じになっているのでしょうか…?

ひとまず、動作確認できたのでここまでですね。

まとめ

RabbitMQのJavaScriptチュートリアルの「Publish/Subscribe」を、TypeScriptで試してみました。

「Work Queues」の時にある程度やり方を確立したので、今回は割とあっさりできたかなと思います。