CLOVER🍀

That was when it all began.

RabbitMQのJavaScriptチュートリアルの「Work Queues」をTypeScriptで試す

これは、なにをしたくて書いたもの?

RabbitMQのチュートリアルJavaScriptクライアント+TypeScriptでやっていこう、ということで。

今回は「Work Queues」を扱います。こちらですね。

RabbitMQ tutorial - Work Queues — RabbitMQ

Work Queues

最初のチュートリアルでは、ひとつのキューに対して、それぞれひとつずつのプロデューサーとコンシューマーを作成しました。

RabbitMQ tutorial - "Hello World!" — RabbitMQ

今回は、複数のコンシューマー(ワーカー)に対して、時間のかかるタスクを分散して処理してもらうチュートリアルになります。

RabbitMQ tutorial - Work Queues — RabbitMQ

このパターンを使うことで、プロデューサーはキューに対してメッセージを送信、ワーカーに処理を依頼することで重たい処理の
完了を待つことを回避することができます。
ワーカーは、キューに入ったメッセージを順次読み出します。

また、このチュートリアルは以下もポイントです。

  • ワーカーに対する、メッセージのラウンドロビンディスパッチ
  • メッセージの配信確認(ACK)
  • メッセージの永続化

では、進めていきましょう。

環境

今回の環境は、こちら。

$ sudo -u rabbitmq rabbitmqctl version
3.11.10

RabbitMQは、172.17.0.2で動作しているものとします。

ユーザーも作成しておきます。

$ sudo -u rabbitmq rabbitmqctl add_user kazuhira password
$ sudo -u rabbitmq rabbitmqctl set_permissions -p / kazuhira '.*' '.*' '.*'
$ sudo -u rabbitmq rabbitmqctl set_user_tags kazuhira monitoring

使用するNode.jsのバージョン。

$ node --version
v18.15.0


$ npm --version
9.5.0

準備

まずはNode.jsプロジェクトを作成します。

$ npm init -y
$ npm i -D typescript
$ npm i -D @types/node@v18
$ npm i -D prettier
$ npm i -D jest @types/jest
$ npm i -D esbuild esbuild-jest

確認は、テストコードで行うことにします。

RabbitMQに接続するためのamqplibとその型定義をインストール。

$ npm i amqplib
$ npm i -D @types/amqplib

ソースコードを配置するディレクトリを作成。

$ mkdir src test

依存関係は、こうなりました。

  "devDependencies": {
    "@types/amqplib": "^0.10.1",
    "@types/jest": "^29.5.0",
    "@types/node": "^18.15.3",
    "esbuild": "^0.17.12",
    "esbuild-jest": "^0.5.0",
    "jest": "^29.5.0",
    "prettier": "^2.8.4",
    "typescript": "^5.0.2"
  },
  "dependencies": {
    "amqplib": "^0.10.3"
  }

scripts

  "scripts": {
    "build": "tsc --project .",
    "build:watch": "tsc --project . --watch",
    "typecheck": "tsc --project ./tsconfig.typecheck.json",
    "typecheck:watch": "tsc --project ./tsconfig.typecheck.json --watch",
    "test": "jest",
    "format": "prettier --write src test"
  },

各種設定ファイル。

tsconfig.json

{
  "compilerOptions": {
    "target": "esnext",
    "module": "commonjs",
    "moduleResolution": "node",
    "lib": ["esnext"],
    "baseUrl": "./src",
    "outDir": "dist",
    "strict": true,
    "forceConsistentCasingInFileNames": true,
    "noFallthroughCasesInSwitch": true,
    "noImplicitOverride": true,
    "noImplicitReturns": true,
    "noPropertyAccessFromIndexSignature": true,
    "esModuleInterop": true
  },
  "include": [
    "src"
  ]
}

tsconfig.typecheck.json

{
  "extends": "./tsconfig",
  "compilerOptions": {
    "baseUrl": "./",
    "noEmit": true
  },
  "include": [
    "src", "test"
  ]
}

.prettierrc.json

{
  "singleQuote": true,
  "printWidth": 120
}

jest.config.js

module.exports = {
  testEnvironment: 'node',
  transform: {
    "^.+\\.tsx?$": "esbuild-jest"
  }
};

RabbitMQのJavaScriptチュートリアルの「Work Queues」をTypeScriptとJestで試す

では、こちらをTypeScriptとJestで試していきます。

RabbitMQ tutorial - Work Queues — RabbitMQ

もとのソースコードはプロデューサーとコンシューマー × 2をそれぞれ別プロセスで起動しているのですが、今回は頑張ってこれを
テストコード内で表現してみましょう。

プロデューサーを作成する

最初はプロデューサーを作成します。今回はプロデューサーもワーカーも、クラスとして作成することにしました。

src/Producer.ts

import amqp from 'amqplib';

export class Producer {
  private conn: amqp.Connection;
  private channel: amqp.Channel;
  private queue: string;

  constructor(conn: amqp.Connection, queue: string, channel: amqp.Channel) {
    this.conn = conn;
    this.queue = queue;
    this.channel = channel;
  }

  static async create(url: string, queue: string): Promise<Producer> {
    const conn = await amqp.connect(url);

    const channel = await conn.createChannel();

    channel.assertQueue(queue, { durable: true });

    return new Producer(conn, queue, channel);
  }

  send(message: string): void {
    this.channel.sendToQueue(this.queue, Buffer.from(message), { persistent: true });

    console.log(" [x] Send '%s'", message);
  }

  async close(): Promise<void> {
    await this.conn.close();
  }
}

インスタンスを作成して、sendメソッドでメッセージ送信、最後にcloseする使い方になります。

最初のチュートリアルと使っているAPIはほぼ変わらないのですが、以下の2箇所が異なります。

    channel.assertQueue(queue, { durable: true });

    this.channel.sendToQueue(this.queue, Buffer.from(message), { persistent: true });

Channel#assertQueueに指定するオプションであるdurabletrueにすることで、RabbitMQが再起動してもメッセージが失われない
キューとすることができます。

Channel-oriented API reference / API reference / Channel / assertQueue

また、永続化するメッセージであることを示すには、ConfirmChannel#sendToQueueのオプションであるpersistenttrueにする必要が
あります。

Channel-oriented API reference / API reference / ConfirmChannel / sendToQueue

このメソッドにはオプションの説明が記載されていないのですが、こちらを見れば良さそうですね。

Channel-oriented API reference / API reference / Channel / publish

ワーカーを作成する

次に、ワーカーを作成します。

src/Worker.ts

import amqp from 'amqplib';
import { setTimeout } from 'timers/promises';

export class Worker {
  private name: string;

  private conn: amqp.Connection;
  private channel: amqp.Channel;
  private queue: string;

  private receivedMessages: string[] = [];

  constructor(name: string, conn: amqp.Connection, queue: string, channel: amqp.Channel) {
    this.name = name;
    this.conn = conn;
    this.queue = queue;
    this.channel = channel;
  }

  static async create(name: string, url: string, queue: string): Promise<Worker> {
    const conn = await amqp.connect(url);

    const channel = await conn.createChannel();

    await channel.assertQueue(queue, { durable: true });

    return new Worker(name, conn, queue, channel);
  }

  async startWithNoAck(): Promise<void> {
    await this.channel.consume(
      this.queue,
      async (message) => {
        const contentAsString = message!.content.toString();

        const secs = contentAsString.split('.').length - 1;

        console.log(' %s: [x] Received %s, Sleep Time %d', this.name, contentAsString, secs);

        await setTimeout(secs * 1000);

        this.receivedMessages.push(contentAsString);
      },
      {
        noAck: true,
      }
    );
  }

  async startWithAck(): Promise<void> {
    this.channel.prefetch(1);

    await this.channel.consume(
      this.queue,
      async (message) => {
        const contentAsString = message!.content.toString();

        const secs = contentAsString.split('.').length - 1;

        console.log(' %s: [x] Received %s, Sleep Time %d', this.name, contentAsString, secs);

        await setTimeout(secs * 1000);

        this.receivedMessages.push(contentAsString);

        this.channel.ack(message!);
      },
      {
        noAck: false,
      }
    );
  }

  getReceivedMessages(): string[] {
    return [...this.receivedMessages];
  }

  async close(): Promise<void> {
    await this.conn.close();
  }
}

インスタンスを作成した後に、start...メソッドでメッセージの受信を行い、getReceivedMessagesでその時点で受信している
メッセージの配列を返します。最後にclose、ですね。

キューの作成(存在確認)は、プロデューサーと同じようにdurabletrueにしておきます。

    await channel.assertQueue(queue, { durable: true });

メッセージの受信は、2通り用意しています。

まずは、Channel#consumeの呼び出しの際にオプションのnoAcktrueに指定するパターン。

  async startWithNoAck(): Promise<void> {
    await this.channel.consume(
      this.queue,
      async (message) => {
        const contentAsString = message!.content.toString();

        const secs = contentAsString.split('.').length - 1;

        console.log(' %s: [x] Received %s, Sleep Time %d', this.name, contentAsString, secs);

        await setTimeout(secs * 1000);

        this.receivedMessages.push(contentAsString);
      },
      {
        noAck: true,
      }
    );
  }

noAckは、trueにするとブローカーはコンシューマーに配信したメッセージの確認を行わず、ブローカーがメッセージをネットワークに
送信した時点ですぐにメッセージを削除します。
falseにすると、コンシューマーがメッセージの確認結果をブローカーに返す必要があります。この時、確認を行わなかった場合は
Channelを閉じるとメッセージが再度キューに入れられることがあります。

デフォルトはfalseです。

Channel-oriented API reference / API reference / Channel / consume

こちらのメソッド定義では、noAcktrueにしているのですぐにRabbitMQブローカーからすぐにメッセージを削除することになります。

もうひとつの定義。

  async startWithAck(): Promise<void> {
    this.channel.prefetch(1);

    await this.channel.consume(
      this.queue,
      async (message) => {
        const contentAsString = message!.content.toString();

        const secs = contentAsString.split('.').length - 1;

        console.log(' %s: [x] Received %s, Sleep Time %d', this.name, contentAsString, secs);

        await setTimeout(secs * 1000);

        this.receivedMessages.push(contentAsString);

        this.channel.ack(message!);
      },
      {
        noAck: false,
      }
    );
  }

こちらではnoAckfalseにしています。

こうすると、コンシューマーからブローカーに確認結果を送り返す必要があります。それが以下のChannel#ackの部分です。

        this.channel.ack(message!);

Channel-oriented API reference / API reference / Channel / ack

第1引数には、確認対象のメッセージを指定します。
第2引数をtrueにすると指定されたメッセージおよびそれ以前のメッセージをすべて確認済みにします。省略した場合は、
falseと同じとみなされます。

対象のChannel上のすべてのメッセージを確認対象とする、Channel#ackAllというメソッドもあります。

Channel-oriented API reference / API reference / Channel / ackAll

この反対として、対象のメッセージを拒否するChannel#nack、未処理のメッセージをすべて拒否するChannel#nackAllもあります。
requeuetrueにすると、キューに戻す動きになるようです。

また、デフォルトでRabbitMQはメッセージをラウンドロビンでコンシューマーに送信します。ただ、これはメッセージを単純に
ディスパッチするので、RabbitMQブローカーがコンシューマーにひとつずつメッセージを送信する(一気に複数のメッセージを
送信しないようにする)ようにしています。

それが以下ですね。

    this.channel.prefetch(1);

Channel-oriented API reference / API reference / Channel / prefetch

これは、コンシューマー単位のプリフェッチ数を設定します。ここで指定する値は、コンシューマー単位で確認(ACK)を待つ最大数です。
値を設定すると、ひとつまたはそれ以上の確認(ACK)がコンシューマーから送信されるまで、ブローカーはそのコンシューマーに
メッセージを送信しなくなります。

テストコードで確認する

最後は、テストコードを書いて確認しましょう。

test/work-queues.test.ts

import { setTimeout } from 'timers/promises';
import { Producer } from '../src/Producer';
import { Worker } from '../src/Worker';

jest.setTimeout(30 * 1000);

test('work queues, no ack', async () => {
  const url = 'amqp://kazuhira:password@172.17.0.2:5672';
  const queue = 'task_queue_auto_ack';

  const producer = await Producer.create(url, queue);

  const worker1 = await Worker.create('no-ack-worker1', url, queue);
  const worker2 = await Worker.create('no-ack-worker2', url, queue);

  try {
    await worker1.startWithNoAck();
    await worker2.startWithNoAck();

    producer.send('First message.');
    producer.send('Second message..');
    producer.send('Third message...');
    producer.send('Fourth message....');
    producer.send('Fifth message.....');

    await setTimeout(2 * 1000); // send wait

    await setTimeout(10 * 1000); // receive wait

    expect(worker1.getReceivedMessages()).toEqual(['First message.', 'Third message...', 'Fifth message.....']);
    expect(worker2.getReceivedMessages()).toEqual(['Second message..', 'Fourth message....']);
  } finally {
    await producer.close();
    await worker1.close();
    await worker2.close();
  }
});

test('work queues, with ack', async () => {
  const url = 'amqp://kazuhira:password@172.17.0.2:5672';
  const queue = 'task_queue_manual_ack';

  const producer = await Producer.create(url, queue);

  const worker1 = await Worker.create('with-ack-worker1', url, queue);
  const worker2 = await Worker.create('with-ack-worker2', url, queue);

  try {
    await worker1.startWithAck();
    await worker2.startWithAck();

    producer.send('First message.');
    producer.send('Second message..');
    producer.send('Third message...');
    producer.send('Fourth message....');
    producer.send('Fifth message.....');

    await setTimeout(2 * 1000); // send wait

    await setTimeout(10 * 1000); // receive wait

    expect(worker1.getReceivedMessages()).toEqual(['First message.', 'Third message...', 'Fifth message.....']);
    expect(worker2.getReceivedMessages()).toEqual(['Second message..', 'Fourth message....']);
  } finally {
    await producer.close();
    await worker1.close();
    await worker2.close();
  }
});

それぞれ、確認(ACK)なし、確認(ACK)ありのパターンです。今回は結果は同じですが、メッセージがラウンドロビン
振り分けられていることも確認できます。

では、実行。

$ npm test

コンソール上のログは、こんな感じになります。

> work-queues@1.0.0 test
> jest

  console.log
     [x] Send 'First message.'

      at Producer.send (src/Producer.ts:48:13)

  console.log
     [x] Send 'Second message..'

      at Producer.send (src/Producer.ts:48:13)

  console.log
     [x] Send 'Third message...'

      at Producer.send (src/Producer.ts:48:13)

  console.log
     [x] Send 'Fourth message....'

      at Producer.send (src/Producer.ts:48:13)

  console.log
     [x] Send 'Fifth message.....'

      at Producer.send (src/Producer.ts:48:13)

  console.log
     no-ack-worker1: [x] Received First message., Sleep Time 1

      at channel.consume.noAck (src/Worker.ts:55:17)

  console.log
     no-ack-worker2: [x] Received Second message.., Sleep Time 2

      at channel.consume.noAck (src/Worker.ts:55:17)

  console.log
     no-ack-worker2: [x] Received Fourth message...., Sleep Time 4

      at channel.consume.noAck (src/Worker.ts:55:17)

  console.log
     no-ack-worker1: [x] Received Third message..., Sleep Time 3

      at channel.consume.noAck (src/Worker.ts:55:17)

  console.log
     no-ack-worker1: [x] Received Fifth message....., Sleep Time 5

      at channel.consume.noAck (src/Worker.ts:55:17)

  console.log
     [x] Send 'First message.'

      at Producer.send (src/Producer.ts:48:13)

  console.log
     [x] Send 'Second message..'

      at Producer.send (src/Producer.ts:48:13)

  console.log
     [x] Send 'Third message...'

      at Producer.send (src/Producer.ts:48:13)

  console.log
     [x] Send 'Fourth message....'

      at Producer.send (src/Producer.ts:48:13)

  console.log
     [x] Send 'Fifth message.....'

      at Producer.send (src/Producer.ts:48:13)

  console.log
     with-ack-worker1: [x] Received First message., Sleep Time 1

      at channel.consume.noAck (src/Worker.ts:71:17)

  console.log
     with-ack-worker2: [x] Received Second message.., Sleep Time 2

      at channel.consume.noAck (src/Worker.ts:71:17)

  console.log
     with-ack-worker1: [x] Received Third message..., Sleep Time 3

      at channel.consume.noAck (src/Worker.ts:71:17)

  console.log
     with-ack-worker2: [x] Received Fourth message...., Sleep Time 4

      at channel.consume.noAck (src/Worker.ts:71:17)

  console.log
     with-ack-worker1: [x] Received Fifth message....., Sleep Time 5

      at channel.consume.noAck (src/Worker.ts:71:17)

 PASS  test/work-queues.test.ts (24.796 s)
  ✓ work queues, no ack (12259 ms)
  ✓ work queues, with ack (12215 ms)

Test Suites: 1 passed, 1 total
Tests:       2 passed, 2 total
Snapshots:   0 total
Time:        24.864 s, estimated 25 s
Ran all test suites.

OKですね。

まとめ

RabbitMQのJavaScriptチュートリアルの「Work Queues」を、TypeScriptで試してみました。

テストコードでの確認にあたって、扱いやすいようにクラスにまとめたりとだいぶ四苦八苦しましたが、最終的にチュートリアル
同じことが実現できたので良かったのかなと。

TypeScriptの学習も兼ねているので。