これは、なにをしたくて書いたもの?
RabbitMQのチュートリアルをJavaScriptクライアント+TypeScriptでやっていこう、ということで。
今回は「Topics」を扱います。こちらですね。
RabbitMQ tutorial - Topics — RabbitMQ
Topics
Fanout Exchangeでブロードキャストを、Direct Exchangeでルーティングが行えることを過去2つのチュートリアルで扱いました。
RabbitMQのJavaScriptチュートリアルの「Publish/Subscribe」をTypeScriptで試す - CLOVER🍀
RabbitMQのJavaScriptチュートリアルの「Routing」をTypeScriptで試す - CLOVER🍀
これらに対して、複数の条件でルーティングを行いたいという時に使うのがTopic Exchangeのようです。
AMQP 0-9-1 Model Explained / Exchanges and Exchange Types
RabbitMQ tutorial - Topics — RabbitMQ
topic
は、ルーティングキーに.
で区切られた単語を含める必要があるExchangeです。
AMQP 0-9-1 Model Explained / Exchanges and Exchange Types / Topic Exchange
例としてはstock.usd.nyse
、nyse.vmw
、quick.orange.rabbit
といったもので、255バイト以内の長さであればOKです。
バインディングキーも同じ形式である必要があり、あるルーティングキーで送信されたメッセージは、一致するバインディングキーで
バインドされているすべてのキューに配信されます。
ルーティングキーとバインディングキーの差がわかりにくいですが、送信側で使うのがルーティングキー、受信側で使うのが
バインディングキーという位置づけのようです。
なお、これらのキーに対しては2種類のワイルドカードが定義されています。
*
… ひとつの単語の代わりに使用できる#
… 0以上の単語を置き換えることができる
*.orange.*
やlazy.#
が例になります。
いずれのバインディングにも一致しなかったメッセージは破棄されるようです。複数のバインディングと一致する場合は、最後の
バインディングとなるキューに配信されるようです。
このあたりは、実際の動作を見た方がよさそうですね。
また、Topic Exchangeはバインディングキーの指定によっては、他のExchangeと同じように使えるようです。
バインディングキーを#
のみとした場合は、Fanout Exchangeと同じ動作になるようです。ワイルドカードを含まない
バインディングキーとした場合は、Direct Exchangeと同じ動作になるようです。
では、このあたりの動作をNode.js+TypeScriptで試してみます。
環境
今回の環境は、こちら。
$ sudo -u rabbitmq rabbitmqctl version 3.12.8
RabbitMQは、172.17.0.2で動作しているものとします。
$ sudo -u rabbitmq rabbitmqctl add_user kazuhira password $ sudo -u rabbitmq rabbitmqctl set_permissions -p / kazuhira '.*' '.*' '.*' $ sudo -u rabbitmq rabbitmqctl set_user_tags kazuhira monitoring
使用するNode.jsのバージョン。
$ node --version v18.18.2 $ npm --version 9.8.1
準備
まずはNode.jsプロジェクトを作成。
$ npm init -y $ npm i -D typescript $ npm i -D @types/node@v18 $ npm i -D prettier $ npm i -D jest @types/jest $ npm i -D esbuild esbuild-jest
確認は、テストコードで行います。
RabbitMQに接続するためのamqplibとその型定義をインストール。
$ npm i amqplib $ npm i -D @types/amqplib
$ mkdir src test
依存関係は、こうなりました。
"devDependencies": { "@types/amqplib": "^0.10.4", "@types/jest": "^29.5.8", "@types/node": "^18.18.9", "esbuild": "^0.19.5", "esbuild-jest": "^0.5.0", "jest": "^29.7.0", "prettier": "^3.0.3", "typescript": "^5.2.2" }, "dependencies": { "amqplib": "^0.10.3" }
scripts
。
"scripts": { "build": "tsc --project .", "build:watch": "tsc --project . --watch", "typecheck": "tsc --project ./tsconfig.typecheck.json", "typecheck:watch": "tsc --project ./tsconfig.typecheck.json --watch", "test": "jest", "format": "prettier --write src test" },
各種設定ファイル。
tsconfig.json
{ "compilerOptions": { "target": "esnext", "module": "commonjs", "moduleResolution": "node", "lib": ["esnext"], "baseUrl": "./src", "outDir": "dist", "strict": true, "forceConsistentCasingInFileNames": true, "noFallthroughCasesInSwitch": true, "noImplicitOverride": true, "noImplicitReturns": true, "noPropertyAccessFromIndexSignature": true, "esModuleInterop": true }, "include": [ "src" ] }
tsconfig.typecheck.json
{ "extends": "./tsconfig", "compilerOptions": { "baseUrl": "./", "noEmit": true }, "include": [ "src", "test" ] }
.prettierrc.json
{ "singleQuote": true, "printWidth": 120 }
jest.config.js
module.exports = { testEnvironment: 'node', transform: { "^.+\\.tsx?$": "esbuild-jest" } };
RabbitMQのJavaScriptチュートリアルの「Topics」をTypeScriptとJestで試す
では、こちらをTypeScriptとJestで試していきます。
RabbitMQ tutorial - Topics — RabbitMQ
チュートリアル内ではプロデューサーとコンシューマーをそれぞれ別プロセスで起動していますが、今回はテストコード内で表現して
みます。
プロデューサーを作成する
まずはプロデューサーから。クラスとして作成します。
src/Producer.ts
import amqp from 'amqplib'; import { log } from './log'; export class Producer { private conn: amqp.Connection; private channel: amqp.Channel; private exchange: string; constructor(conn: amqp.Connection, channel: amqp.Channel, exchange: string) { this.conn = conn; this.channel = channel; this.exchange = exchange; } static async create(url: string, exchange: string): Promise<Producer> { const conn = await amqp.connect(url); const channel = await conn.createChannel(); await channel.assertExchange(exchange, 'topic', { durable: false }); return new Producer(conn, channel, exchange); } send(routingKey: string, message: string): void { this.channel.publish(this.exchange, routingKey, Buffer.from(message)); log(` [x] Send '${message}'`); } async close(): Promise<void> { await this.conn.close(); } }
Topic Exchangeを作成。
static async create(url: string, exchange: string): Promise<Producer> { const conn = await amqp.connect(url); const channel = await conn.createChannel(); await channel.assertExchange(exchange, 'topic', { durable: false }); return new Producer(conn, channel, exchange); }
Channel-oriented API reference / API reference / Channel / assertExchange
メッセージをExchangeに送信する箇所は、こちら。
send(routingKey: string, message: string): void { this.channel.publish(this.exchange, routingKey, Buffer.from(message)); log(` [x] Send '${message}'`); }
Channel#publish
の第2引数にルーティングキーを指定します。
Channel-oriented API reference / API reference / Channel / publish
このルーティングキーで、メッセージを受信するコンシューマーが決まります。ルーティングキーにマッチするコンシューマーは、
複数あってかまいません。
ちなみに、log
というのはこういう定義です。
src/log.ts
export function log(message: string): void { console.log(`[${new Date().toISOString()}] ${message}`); }
こちらはコンシューマーでも使います。
コンシューマーを作成する
次は、コンシューマーを作成します。
src/Consumer.ts
import amqp from 'amqplib'; import { log } from './log'; export class Consumer { private name: string; private conn: amqp.Connection; private channel: amqp.Channel; private exchange: string; private queue: string; private routingKeys: string[]; private receivedMessages: string[] = []; constructor( name: string, conn: amqp.Connection, channel: amqp.Channel, exchange: string, queue: string, routingKeys: string[], ) { this.name = name; this.conn = conn; this.channel = channel; this.exchange = exchange; this.queue = queue; this.routingKeys = routingKeys; } static async create(name: string, url: string, exchange: string, routingKeys: string[]): Promise<Consumer> { const conn = await amqp.connect(url); const channel = await conn.createChannel(); await channel.assertExchange(exchange, 'topic', { durable: false }); const queue = await channel.assertQueue('', { exclusive: true }); return new Consumer(name, conn, channel, exchange, queue.queue, routingKeys); } async start(): Promise<void> { for (const routingKey of this.routingKeys) { await this.channel.bindQueue(this.queue, this.exchange, routingKey); } await this.channel.consume( this.queue, async (message) => { const messageAsString = message!.content.toString(); log(` [${this.name} - ${message?.fields.routingKey}] Received, ${messageAsString}`); this.receivedMessages.push(messageAsString); }, { noAck: true, }, ); log(` [${this.name} - [${this.routingKeys.join(', ')}]] waiting for message, for queue[${this.queue}]`); } getReceivedMessages(): string[] { return [...this.receivedMessages]; } async close(): Promise<void> { await this.conn.close(); } }
Channel#assertExchange
まではプロデューサーと同じで、その後にChannel#assertQueue
を呼び出します。
static async create(name: string, url: string, exchange: string, routingKeys: string[]): Promise<Consumer> { const conn = await amqp.connect(url); const channel = await conn.createChannel(); await channel.assertExchange(exchange, 'topic', { durable: false }); const queue = await channel.assertQueue('', { exclusive: true }); return new Consumer(name, conn, channel, exchange, queue.queue, routingKeys); }
Channel-oriented API reference / API reference / Channel / assertQueue
Routingの時もそうだったのですが、Channel#assertQueue
でキューの名前を空文字列(''
)で指定しているところがポイントで、
こうするとamq.gen-JzTY20BRgKO-HjmUJj0wLg
のようなランダムな名前でキューが作成されます。 またexclusive
をtrue
に
しているので、これは排他的な一時キューとなり、接続がクローズされると削除されるものになります。
そしてキューにバインドします。
for (const routingKey of this.routingKeys) { await this.channel.bindQueue(this.queue, this.exchange, routingKey); }
この時、第3引数にルーティングキーを渡すことで、プロデューサーが指定したルーティングキーと一致する場合にこのコンシューマーは
キューからメッセージを受信できます。
なお、今回はひとつのコンシューマーで複数のルーティングキーをバインドできるようにしています。
メッセージの受信。
await this.channel.consume( this.queue, async (message) => { const messageAsString = message!.content.toString(); log(` [${this.name} - ${message?.fields.routingKey}] Received, ${messageAsString}`); this.receivedMessages.push(messageAsString); }, { noAck: true, }, );
どのルーティングキーによるメッセージなのかは、受信したメッセージの中身で判別できます。
noAck
をtrue
にしているので、メッセージを受信したらその時点でブローカーからメッセージを削除します。
これでプロデューサーとコンシューマーの準備は完了です。
テストコードで確認する
確認はテストコードで行います。
test/topics.test.ts
import { setTimeout } from 'timers/promises'; import { Consumer } from '../src/Consumer'; import { Producer } from '../src/Producer'; jest.setTimeout(10 * 1000); test('topics test', async () => { const url = 'amqp://kazuhira:password@172.17.0.2:5672'; const exchange = 'topic_logs'; const producer = await Producer.create(url, exchange); // ルーティングキー info のメッセージを受け取る const infoLevelConsumer = await Consumer.create('info-level-consumer', url, exchange, ['info']); // 任意の1単語のルーティングキーのメッセージを受け取る const wildcardWordConsumer = await Consumer.create('wildcard-word-consumer', url, exchange, ['*']); // ルーティングキー web.[任意の1単語] のメッセージを受け取る const webCategoryConsumer = await Consumer.create('web-category-consumer', url, exchange, ['web.*']); // ルーティングキー cli.info のメッセージを受け取る const cliInfoConsumer = await Consumer.create('cli-info-consumer', url, exchange, ['cli.info']); // ルーティングキー [任意の0以上の単語].info のメッセージを受け取る const allCategoryInfoConsumer = await Consumer.create('all-category-info-consumer', url, exchange, ['#.info']); // ルーティングキー web.[任意の1単語] または cli.[任意の1単語] のメッセージを受け取る const webAndCliCategoryConsumer = await Consumer.create('web-and-cli-category-consumer', url, exchange, [ 'web.*', 'cli.*', ]); try { await infoLevelConsumer.start(); await wildcardWordConsumer.start(); await webCategoryConsumer.start(); await cliInfoConsumer.start(); await allCategoryInfoConsumer.start(); await webAndCliCategoryConsumer.start(); // routing key, info producer.send('info', 'Info Message'); await setTimeout(1 * 1000); expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']); expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message']); expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual([]); expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual([]); expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual(['Info Message']); expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual([]); // routing key, warn producer.send('warn', 'Warn Message'); await setTimeout(1 * 1000); expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']); expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message', 'Warn Message']); expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual([]); expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual([]); expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual(['Info Message']); expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual([]); // routing key, web.info producer.send('web.info', '[Web] Info Message'); await setTimeout(1 * 1000); expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']); expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message', 'Warn Message']); expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message']); expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual([]); expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual(['Info Message', '[Web] Info Message']); expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message']); // routing key, cli.warn producer.send('cli.warn', '[Cli] Warn Message'); await setTimeout(1 * 1000); expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']); expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message', 'Warn Message']); expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message']); expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual([]); expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual(['Info Message', '[Web] Info Message']); expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message', '[Cli] Warn Message']); // routing key, cli.info producer.send('cli.info', '[Cli] Info Message'); await setTimeout(1 * 1000); expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']); expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message', 'Warn Message']); expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message']); expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual(['[Cli] Info Message']); expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual([ 'Info Message', '[Web] Info Message', '[Cli] Info Message', ]); expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual([ '[Web] Info Message', '[Cli] Warn Message', '[Cli] Info Message', ]); // routing key, category.sub-category.info producer.send('category.sub-category.info', '[Category / Sub Category] Info Message'); await setTimeout(1 * 1000); expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']); expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message', 'Warn Message']); expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message']); expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual(['[Cli] Info Message']); expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual([ 'Info Message', '[Web] Info Message', '[Cli] Info Message', '[Category / Sub Category] Info Message', ]); expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual([ '[Web] Info Message', '[Cli] Warn Message', '[Cli] Info Message', ]); } finally { await producer.close(); await infoLevelConsumer.close(); await wildcardWordConsumer.close(); await webCategoryConsumer.close(); await cliInfoConsumer.close(); await allCategoryInfoConsumer.close(); await webAndCliCategoryConsumer.close(); } });
第4引数の配列がルーティングキーです。お題はチュートリアルと似ているのですが、カテゴリごとのログレベルにしています。
// ルーティングキー info のメッセージを受け取る const infoLevelConsumer = await Consumer.create('info-level-consumer', url, exchange, ['info']); // 任意の1単語のルーティングキーのメッセージを受け取る const wildcardWordConsumer = await Consumer.create('wildcard-word-consumer', url, exchange, ['*']); // ルーティングキー web.[任意の1単語] のメッセージを受け取る const webCategoryConsumer = await Consumer.create('web-category-consumer', url, exchange, ['web.*']); // ルーティングキー cli.info のメッセージを受け取る const cliInfoConsumer = await Consumer.create('cli-info-consumer', url, exchange, ['cli.info']); // ルーティングキー [任意の0以上の単語].info のメッセージを受け取る const allCategoryInfoConsumer = await Consumer.create('all-category-info-consumer', url, exchange, ['#.info']); // ルーティングキー web.[任意の1単語] または cli.[任意の1単語] のメッセージを受け取る const webAndCliCategoryConsumer = await Consumer.create('web-and-cli-category-consumer', url, exchange, [ 'web.*', 'cli.*', ]);
コンシューマーが受け取るメッセージのルールは、コメントに書いています。
ここで、ワイルドカードには2種類あったことを思い出しましょう。
*
… ひとつの単語の代わりに使用できる#
… 0以上の単語を置き換えることができる
あとはコンシューマーを開始して、ルーティングキーを指定してメッセージを送り、確認していきます。
ルーティングキーinfo
。
// routing key, info producer.send('info', 'Info Message'); await setTimeout(1 * 1000); expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']); expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message']); expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual([]); expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual([]); expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual(['Info Message']); expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual([]);
ルーティングキーwarn
。
// routing key, warn producer.send('warn', 'Warn Message'); await setTimeout(1 * 1000); expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']); expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message', 'Warn Message']); expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual([]); expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual([]); expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual(['Info Message']); expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual([]);
ルーティングキーweb.info
。
// routing key, web.info producer.send('web.info', '[Web] Info Message'); await setTimeout(1 * 1000); expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']); expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message', 'Warn Message']); expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message']); expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual([]); expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual(['Info Message', '[Web] Info Message']); expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message']);
ルーティングキーcli.warn
。
// routing key, cli.warn producer.send('cli.warn', '[Cli] Warn Message'); await setTimeout(1 * 1000); expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']); expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message', 'Warn Message']); expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message']); expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual([]); expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual(['Info Message', '[Web] Info Message']); expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message', '[Cli] Warn Message']);
ルーティングキーcli.info
。
// routing key, cli.info producer.send('cli.info', '[Cli] Info Message'); await setTimeout(1 * 1000); expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']); expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message', 'Warn Message']); expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message']); expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual(['[Cli] Info Message']); expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual([ 'Info Message', '[Web] Info Message', '[Cli] Info Message', ]); expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual([ '[Web] Info Message', '[Cli] Warn Message', '[Cli] Info Message', ]);
ルーティングキーcategory.sub-category.info
。
// routing key, category.sub-category.info producer.send('category.sub-category.info', '[Category / Sub Category] Info Message'); await setTimeout(1 * 1000); expect(infoLevelConsumer.getReceivedMessages()).toStrictEqual(['Info Message']); expect(wildcardWordConsumer.getReceivedMessages()).toStrictEqual(['Info Message', 'Warn Message']); expect(webCategoryConsumer.getReceivedMessages()).toStrictEqual(['[Web] Info Message']); expect(cliInfoConsumer.getReceivedMessages()).toStrictEqual(['[Cli] Info Message']); expect(allCategoryInfoConsumer.getReceivedMessages()).toStrictEqual([ 'Info Message', '[Web] Info Message', '[Cli] Info Message', '[Category / Sub Category] Info Message', ]); expect(webAndCliCategoryConsumer.getReceivedMessages()).toStrictEqual([ '[Web] Info Message', '[Cli] Warn Message', '[Cli] Info Message', ]);
こんな感じで、メッセージ送信時に指定するルーティングキーと、受信側で受け取れるルーティングキーの指定でどのようにメッセージが
振り分けられていくのかを確認します。
テストを実行。
$ npm test
出力されるログは、こんな感じになります。
> topics@1.0.0 test > jest console.log [2023-11-12T13:59:38.164Z] [info-level-consumer - [info]] waiting for message, for queue[amq.gen-OJspjJWyM8gOVf3cALgU8A] at log (src/log.ts:24:11) console.log [2023-11-12T13:59:38.185Z] [wildcard-word-consumer - [*]] waiting for message, for queue[amq.gen-tKBwn1jMIRsTnPyFOdBxug] at log (src/log.ts:24:11) console.log [2023-11-12T13:59:38.189Z] [web-category-consumer - [web.*]] waiting for message, for queue[amq.gen-5dDYztuTqk0D9nO1lqmTUQ] at log (src/log.ts:24:11) console.log [2023-11-12T13:59:38.193Z] [cli-info-consumer - [cli.info]] waiting for message, for queue[amq.gen-ahlQp-YTm6DWOOWi_wEFZA] at log (src/log.ts:24:11) console.log [2023-11-12T13:59:38.197Z] [all-category-info-consumer - [#.info]] waiting for message, for queue[amq.gen-f6KPc3QLIVgNq0Qviej8HA] at log (src/log.ts:24:11) console.log [2023-11-12T13:59:38.201Z] [web-and-cli-category-consumer - [web.*, cli.*]] waiting for message, for queue[amq.gen-DzAdFyAn0vc3-r4hHNwhWw] at log (src/log.ts:24:11) console.log [2023-11-12T13:59:38.204Z] [x] Send 'Info Message' at log (src/log.ts:24:11) console.log [2023-11-12T13:59:38.208Z] [wildcard-word-consumer - info] Received, Info Message at log (src/log.ts:24:11) console.log [2023-11-12T13:59:38.211Z] [all-category-info-consumer - info] Received, Info Message at log (src/log.ts:24:11) console.log [2023-11-12T13:59:38.212Z] [info-level-consumer - info] Received, Info Message at log (src/log.ts:24:11) console.log [2023-11-12T13:59:39.213Z] [x] Send 'Warn Message' at log (src/log.ts:24:11) console.log [2023-11-12T13:59:39.217Z] [wildcard-word-consumer - warn] Received, Warn Message at log (src/log.ts:24:11) console.log [2023-11-12T13:59:40.220Z] [x] Send '[Web] Info Message' at log (src/log.ts:24:11) console.log [2023-11-12T13:59:40.225Z] [web-and-cli-category-consumer - web.info] Received, [Web] Info Message at log (src/log.ts:24:11) console.log [2023-11-12T13:59:40.229Z] [all-category-info-consumer - web.info] Received, [Web] Info Message at log (src/log.ts:24:11) console.log [2023-11-12T13:59:40.233Z] [web-category-consumer - web.info] Received, [Web] Info Message at log (src/log.ts:24:11) console.log [2023-11-12T13:59:41.228Z] [x] Send '[Cli] Warn Message' at log (src/log.ts:24:11) console.log [2023-11-12T13:59:41.232Z] [web-and-cli-category-consumer - cli.warn] Received, [Cli] Warn Message at log (src/log.ts:24:11) console.log [2023-11-12T13:59:42.235Z] [x] Send '[Cli] Info Message' at log (src/log.ts:24:11) console.log [2023-11-12T13:59:42.240Z] [all-category-info-consumer - cli.info] Received, [Cli] Info Message at log (src/log.ts:24:11) console.log [2023-11-12T13:59:42.246Z] [cli-info-consumer - cli.info] Received, [Cli] Info Message at log (src/log.ts:24:11) console.log [2023-11-12T13:59:42.251Z] [web-and-cli-category-consumer - cli.info] Received, [Cli] Info Message at log (src/log.ts:24:11) console.log [2023-11-12T13:59:43.247Z] [x] Send '[Category / Sub Category] Info Message' at log (src/log.ts:24:11) console.log [2023-11-12T13:59:43.251Z] [all-category-info-consumer - category.sub-category.info] Received, [Category / Sub Category] Info Message at log (src/log.ts:24:11) PASS test/topics.test.ts (6.847 s) ✓ topics test (6628 ms) Test Suites: 1 passed, 1 total Tests: 1 passed, 1 total Snapshots: 0 total Time: 6.9 s, estimated 7 s Ran all test suites.
OKですね。
また、テスト実行中に以下のコマンドを実行してみます。
$ sudo -u rabbitmq rabbitmqctl list_exchanges $ sudo -u rabbitmq rabbitmqctl list_queues $ sudo -u rabbitmq rabbitmqctl list_bindings
結果。
$ sudo -u rabbitmq rabbitmqctl list_exchanges Listing exchanges for vhost / ... name type amq.direct direct direct amq.fanout fanout amq.topic topic amq.match headers amq.rabbitmq.trace topic amq.headers headers topic_logs topic $ sudo -u rabbitmq rabbitmqctl list_queues Timeout: 60.0 seconds ... Listing queues for vhost / ... name messages amq.gen-inw-ulcTsS2vXTcsV5oPfQ 0 amq.gen-AKmcQWVbO8w69uoGF_2IZQ 0 amq.gen-kuMaXrOg9yoMRtMeOWpu8g 0 amq.gen-QDLMnCjnkUvbvM36OywdBw 0 amq.gen-WnRhEe5b8ATpBrhFWXkn6A 0 amq.gen-HK7x22ChT_KEudcQwgvsAQ 0 $ sudo -u rabbitmq rabbitmqctl list_bindings Listing bindings for vhost /... source_name source_kind destination_name destination_kind routing_key arguments exchange amq.gen-inw-ulcTsS2vXTcsV5oPfQ queue amq.gen-inw-ulcTsS2vXTcsV5oPfQ [] exchange amq.gen-AKmcQWVbO8w69uoGF_2IZQ queue amq.gen-AKmcQWVbO8w69uoGF_2IZQ [] exchange amq.gen-kuMaXrOg9yoMRtMeOWpu8g queue amq.gen-kuMaXrOg9yoMRtMeOWpu8g [] exchange amq.gen-QDLMnCjnkUvbvM36OywdBw queue amq.gen-QDLMnCjnkUvbvM36OywdBw [] exchange amq.gen-WnRhEe5b8ATpBrhFWXkn6A queue amq.gen-WnRhEe5b8ATpBrhFWXkn6A [] exchange amq.gen-HK7x22ChT_KEudcQwgvsAQ queue amq.gen-HK7x22ChT_KEudcQwgvsAQ [] topic_logs exchange amq.gen-QDLMnCjnkUvbvM36OywdBw queue #.info [] topic_logs exchange amq.gen-inw-ulcTsS2vXTcsV5oPfQ queue * [] topic_logs exchange amq.gen-HK7x22ChT_KEudcQwgvsAQ queue cli.* [] topic_logs exchange amq.gen-AKmcQWVbO8w69uoGF_2IZQ queue cli.info [] topic_logs exchange amq.gen-WnRhEe5b8ATpBrhFWXkn6A queue info [] topic_logs exchange amq.gen-HK7x22ChT_KEudcQwgvsAQ queue web.* [] topic_logs exchange amq.gen-kuMaXrOg9yoMRtMeOWpu8g queue web.* []
Topic Exchangeがあり
topic_logs topic
一時キューがあり
amq.gen-inw-ulcTsS2vXTcsV5oPfQ 0 amq.gen-AKmcQWVbO8w69uoGF_2IZQ 0 amq.gen-kuMaXrOg9yoMRtMeOWpu8g 0 amq.gen-QDLMnCjnkUvbvM36OywdBw 0 amq.gen-WnRhEe5b8ATpBrhFWXkn6A 0 amq.gen-HK7x22ChT_KEudcQwgvsAQ 0
Exchangeと一時キューの紐付けも確認できます。
source_name source_kind destination_name destination_kind routing_key arguments exchange amq.gen-inw-ulcTsS2vXTcsV5oPfQ queue amq.gen-inw-ulcTsS2vXTcsV5oPfQ [] exchange amq.gen-AKmcQWVbO8w69uoGF_2IZQ queue amq.gen-AKmcQWVbO8w69uoGF_2IZQ [] exchange amq.gen-kuMaXrOg9yoMRtMeOWpu8g queue amq.gen-kuMaXrOg9yoMRtMeOWpu8g [] exchange amq.gen-QDLMnCjnkUvbvM36OywdBw queue amq.gen-QDLMnCjnkUvbvM36OywdBw [] exchange amq.gen-WnRhEe5b8ATpBrhFWXkn6A queue amq.gen-WnRhEe5b8ATpBrhFWXkn6A [] exchange amq.gen-HK7x22ChT_KEudcQwgvsAQ queue amq.gen-HK7x22ChT_KEudcQwgvsAQ [] topic_logs exchange amq.gen-QDLMnCjnkUvbvM36OywdBw queue #.info [] topic_logs exchange amq.gen-inw-ulcTsS2vXTcsV5oPfQ queue * [] topic_logs exchange amq.gen-HK7x22ChT_KEudcQwgvsAQ queue cli.* [] topic_logs exchange amq.gen-AKmcQWVbO8w69uoGF_2IZQ queue cli.info [] topic_logs exchange amq.gen-WnRhEe5b8ATpBrhFWXkn6A queue info [] topic_logs exchange amq.gen-HK7x22ChT_KEudcQwgvsAQ queue web.* [] topic_logs exchange amq.gen-kuMaXrOg9yoMRtMeOWpu8g queue web.* []
ルーティングキーもわかりますね。
こんなところでしょうか。
おわりに
RabbitMQのJavaScriptチュートリアルの「Topics」を、TypeScriptで試してみました。
前回からまた時間が空いたので、少し前のエントリーを見ながら思い出しつつやってみました。ワイルドカードの扱いをコロッと
忘れたりしていましたが、なんとか確認できる形にはなったかなと思います。
「Routing」と混ざりやすい気がしないでもないですが、いろんなことができますね…。