これは、なにをしたくて書いたもの?
Node.jsでマルチプロセス、それからNode.js 10.5.0以降であればマルチスレッドが使えるようなので、興味がてら試してみることに
しました。
Node.jsでマルチプロセス、マルチスレッド
Node.jsでマルチプロセスを扱うには、clusterモジュールまたはchild_processモジュールを使うようです。
Cluster | Node.js v18.15.0 Documentation
Child process | Node.js v18.15.0 Documentation
clusterモジュールは、内部的にはchild_processモジュールを使っているようです。
The worker processes are spawned using the child_process.fork() method, so that they can communicate with the parent via IPC and pass server handles back and forth.
マルチスレッドを扱うには、worker_threadsモジュールを使うようです。こちらは、Node.js 10.5.0から利用可能になったものですね。
Worker threads | Node.js v18.15.0 Documentation
child_processモジュール
最初はマルチプロセスから、そしてclusterモジュールはchild_processモジュールをベースにしているようなので、
まずはchild_processモジュールを見てみましょう。
child_processモジュールは、子プロセスを生成するモジュールです。
The node:child_process module provides the ability to spawn subprocesses in a manner that is similar, but not identical, to popen(3). This capability is primarily provided by the child_process.spawn() function:
使うメソッドはいくつかありますが、ざっくり以下の3つの系統に分かれます。
- spawn … 指定されたコマンドを実行する。引数は配列で指定
- exec … OSのシェルを介して指定されたコマンドを実行する。引数はスペース区切りで指定
- fork … 実行するモジュールを指定して、新しいNode.jsプロセスを生成する。親プロセスと子プロセスの間はIPC(プロセス間通信)ができる準備が整えられている
上記はサブプロセスが非同期に実行されるのですが、spawnとexecについては同期版もあります。
forkで作成した子プロセスは、IPC(Internal Process Communication、プロセス間通信)でメッセージを送信し合うことができます。
Child process / subprocess.send(message[, sendHandle[, options]][, callback])
メッセージはシリアライズして送信することになります。デフォルトではJSON型式の文字列でシリアライズします。
serialization
Specify the kind of serialization used for sending messages between processes. Possible values are 'json' and 'advanced'. See Advanced serialization for more details. Default: 'json'.
Child process / child_process.fork(modulePath[, args][, options])
また、ちょっと変わったケースとしてTCPのServer
オブジェクトやSocket
オブジェクトを直接渡すこともできます。
- Child process / subprocess.send(message[, sendHandle[, options]][, callback]) / Example: sending a server object
- Child process / subprocess.send(message[, sendHandle[, options]][, callback]) / Example: sending a socket object
マルチプロセスなTCPサーバーを作る用途が意識されてそうな感じですね。
clusterモジュール
clusterモジュールは、child_processモジュールのfork
を使って実現されるものです。つまり、新しいNode.jsプロセスを生成することに
なります。
The worker processes are spawned using the child_process.fork() method, so that they can communicate with the parent via IPC and pass server handles back and forth.
よってIPCを使って子プロセスとやり取りすることができます。
以下のように、ネットワーク接続を受け付けるようなアプリケーションで使われることを意識していますね。
The cluster module supports two methods of distributing incoming connections.
The first one (and the default one on all platforms except Windows) is the round-robin approach, where the primary process listens on a port, accepts new connections and distributes them across the workers in a round-robin fashion, with some built-in smarts to avoid overloading a worker process.
Although a primary use case for the node:cluster module is networking, it can also be used for other use cases requiring worker processes.
生成されたプロセス自体を管理するのはアプリケーション側の役割で、サブプロセスの数の管理をNode.jsで行うようなことはありません。
Node.js does not automatically manage the number of workers, however. It is the application's responsibility to manage the worker pool based on its own needs.
また、サンプルに書かれているようにサーバーが利用するポートをプロセス間で簡単に共有することができます。
import cluster from 'node:cluster'; import http from 'node:http'; import { availableParallelism } from 'node:os'; import process from 'node:process'; const numCPUs = availableParallelism(); if (cluster.isPrimary) { console.log(`Primary ${process.pid} is running`); // Fork workers. for (let i = 0; i < numCPUs; i++) { cluster.fork(); } cluster.on('exit', (worker, code, signal) => { console.log(`worker ${worker.process.pid} died`); }); } else { // Workers can share any TCP connection // In this case it is an HTTP server http.createServer((req, res) => { res.writeHead(200); res.end('hello world\n'); }).listen(8000); console.log(`Worker ${process.pid} started`); }
こんな感じで子プロセス側で同じポートをリッスンするServer
を作成しているのですが、これで同じポートをリッスンする複数プロセスの
Node.jsアプリケーションが動作するんですよね。
http.createServer((req, res) => { res.writeHead(200); res.end('hello world\n'); }).listen(8000);
worker_threadsモジュール
Node.jsでマルチスレッドを扱うには、worker_threadsモジュールを使います。
Worker threads | Node.js v18.15.0 Documentation
ワーカースレッドは、CPUバウンドな処理を実行するのに役立ち、IOバウンドな処理には効果的ではないと書かれています。IO操作に
ついては、Node.jsの非同期IOの方がワーカースレッドを使うよりも効率的だそうです。
Workers (threads) are useful for performing CPU-intensive JavaScript operations. They do not help much with I/O-intensive work. The Node.js built-in asynchronous I/O operations are more efficient than Workers can be.
またclusterモジュールやchild_processモジュールと異なり、worker_threadsモジュールではワーカースレッドとメモリを共有できます。
Unlike child_process or cluster, worker_threads can share memory. They do so by transferring ArrayBuffer instances or sharing SharedArrayBuffer instances.
なお、スレッドプールの機能はありません。AsyncResourceを使って実装しているサンプルがあるよ、と書かれています。
When implementing a worker pool, use the AsyncResource API to inform diagnostic tools (e.g. to provide asynchronous stack traces) about the correlation between tasks and their outcomes. See "Using AsyncResource for a Worker thread pool" in the async_hooks documentation for an example implementation.
こちらですね。
Asynchronous context tracking / Class: AsyncResource / Using AsyncResource for a Worker thread pool
要するに自分で作ってね、と…。
お題
マルチプロセスを使っても、マルチスレッドを使っても、CPU数でスケールすることを確認できればよいので、今回はフィボナッチ数の
計算を行うHTTPサーバーをそれぞれのモジュールを使って試してみたいと思います。
HTTPサーバー自体は、こちらを使って作成します。
HTTP | Node.js v18.15.0 Documentation
また、プログラムはTypeScriptで書くことにします。
環境
今回の環境は、こちら。
$ node --version v18.15.0 $ npm --version 9.5.0
準備
Node.js+TypeScriptの環境を作成します。
$ npm init -y $ npm i -D typescript $ npm i -D @types/node@v18 $ npm i -D prettier $ mkdir src
依存関係はこんな感じになりました。
"devDependencies": { "@types/node": "^18.15.3", "prettier": "^2.8.5", "typescript": "^5.0.2" }
scripts
。
"scripts": { "build": "tsc --project .", "build:watch": "tsc --project . --watch", "format": "prettier --write src" },
設定ファイル。
tsconfig.json
{ "compilerOptions": { "target": "esnext", "module": "commonjs", "moduleResolution": "node", "lib": ["esnext"], "baseUrl": "./src", "outDir": "dist", "strict": true, "forceConsistentCasingInFileNames": true, "noFallthroughCasesInSwitch": true, "noImplicitOverride": true, "noImplicitReturns": true, "noPropertyAccessFromIndexSignature": true, "esModuleInterop": true }, "include": [ "src" ] }
.prettierrc.json
{ "singleQuote": true, "printWidth": 120 }
まずはシンプルに
まずはマルチプロセスもマルチスレッドも使わないバージョンを作成します。
この後、種々のモジュールを使って作成するプログラムでも扱いやすいように、いくつかのファイルに分割して書いていきます。
フィボナッチ数の計算は、http://localhost:8000?num=[整数]
のような指定で行うものとします。
フィボナッチ数の計算を行う関数。
src/fib.ts
export function fib(n: number): number { if (n === 0) { return 0; } else if (n === 1) { return 1; } else { return fib(n - 1) + fib(n - 2); } }
QueryStringをMap
に分解したり、その内容をfib
関数に渡すための関数。
src/handler.ts
import { fib } from './fib'; export function fibRequestHandler(params: Map<string, string>): number | undefined { let result; if (params.has('num')) { result = fib(parseInt(params.get('num')!, 10)); } else { result = undefined; } return result; } export function queryToMap(query: string | undefined): Map<string, string> { const map = new Map<string, string>(); if (!query) { return map; } for (const pair of query.split('&')) { const [name, value] = pair.split('='); map.set(name, value); } return map; }
ログ出力。
src/log.ts
export function log(message: string): void { console.log(`[${new Date().toISOString()}] ${message}`); }
シャットダウン。
src/shutdown.ts
import http from 'node:http'; import { log } from './log'; export function registerShutdown(httpServer: http.Server): void { process.on('SIGTERM', signalHandler(httpServer)); process.on('SIGINT', signalHandler(httpServer)); } function signalHandler(httpServer: http.Server): (signal: NodeJS.Signals) => void { return (signal) => { httpServer.close(() => log(`shutdown, signal: ${signal}`)); }; }
ここまでが基本的な部品ですね。
では、サーバー部分を書きます。
src/simple-http-server.ts
import http from 'node:http'; import { fibRequestHandler, queryToMap } from './handler'; import { log } from './log'; import { registerShutdown } from './shutdown'; const httpServer = http.createServer((req: http.IncomingMessage, res: http.ServerResponse) => { const query = req.url?.split('?')[1]; const params = queryToMap(query); log(`accept request, pid = ${process.pid}`); const result = fibRequestHandler(params); res.writeHead(200, { 'Content-Type': 'application/json' }); res.end( JSON.stringify({ result, url: req.url, pid: process.pid, }) ); }); httpServer.listen( { host: '0.0.0.0', port: 8000, }, () => registerShutdown(httpServer) ); log(`http-server, pid: ${process.pid}`);
ビルドして
$ npm run build
起動。
$ node dist/simple-http-server.js [2023-03-21T13:59:12.290Z] http-server, pid: 27141
確認。
$ curl localhost:8000?num=10 {"result":55,"url":"/?num=10","pid":27141}
こんな感じで返ってきます。
手元の環境では、42くらいがちょうどいい時間がかかりました。
$ time curl localhost:8000?num=42 {"result":267914296,"url":"/?num=42","pid":27141} real 0m3.698s user 0m0.009s sys 0m0.003s
この時、mpstat
で見ると
$ mpstat -P ALL 1
CPUをひとつ使い切っていることがわかります。
23時02分08秒 CPU %usr %nice %sys %iowait %irq %soft %steal %guest %gnice %idle 23時02分09秒 all 13.32 0.00 0.00 0.13 0.00 0.00 0.00 0.00 0.00 86.56 23時02分09秒 0 1.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 99.00 23時02分09秒 1 100.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 23時02分09秒 2 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 100.00 23時02分09秒 3 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 100.00 23時02分09秒 4 3.03 0.00 0.00 1.01 0.00 0.00 0.00 0.00 0.00 95.96 23時02分09秒 5 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 100.00 23時02分09秒 6 1.98 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 98.02 23時02分09秒 7 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 100.00
では、これを連続で4回実行してみます。
$ time curl localhost:8000?num=42 $ time curl localhost:8000?num=42 $ time curl localhost:8000?num=42 $ time curl localhost:8000?num=42
リクエストを受け付けた時のログ。
[2023-03-21T14:01:04.742Z] accept request, pid = 27141 [2023-03-21T14:02:06.498Z] accept request, pid = 27141 [2023-03-21T14:03:17.756Z] accept request, pid = 27141 [2023-03-21T14:03:21.562Z] accept request, pid = 27141
結果。
$ {"result":267914296,"url":"/?num=42","pid":27141} real 0m3.814s user 0m0.004s sys 0m0.005s {"result":267914296,"url":"/?num=42","pid":27141} real 0m7.285s user 0m0.000s sys 0m0.006s {"result":267914296,"url":"/?num=42","pid":27141} real 0m10.876s user 0m0.007s sys 0m0.000s {"result":267914296,"url":"/?num=42","pid":27141} real 0m15.284s user 0m0.004s sys 0m0.004s
ほぼ線形に遅くなっています。
使えるCPUコアはひとつだからですね。
23時04分25秒 CPU %usr %nice %sys %iowait %irq %soft %steal %guest %gnice %idle 23時04分26秒 all 12.96 0.00 0.13 0.00 0.00 0.00 0.00 0.00 0.00 86.92 23時04分26秒 0 100.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 23時04分26秒 1 1.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 99.00 23時04分26秒 2 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 100.00 23時04分26秒 3 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 100.00 23時04分26秒 4 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 100.00 23時04分26秒 5 1.01 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 98.99 23時04分26秒 6 1.01 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 98.99 23時04分26秒 7 0.00 0.00 0.99 0.00 0.00 0.00 0.00 0.00 0.00 99.01
Ctrl-cで停止。
[2023-03-21T14:05:04.163Z] shutdown, signal: SIGINT
これを、マルチプロセスやマルチスレッドを使って変えていきましょう。
また、以降はTypeScriptソースコードをビルドするステップは省略します。
clusterモジュールで書く
最初はclusterモジュールを使って書いてみます。
src/prefork-http-server.ts
import cluster from 'node:cluster'; import http from 'node:http'; import { fibRequestHandler, queryToMap } from './handler'; import { log } from './log'; import { registerShutdown } from './shutdown'; const processes = process.argv[2] !== undefined ? parseInt(process.argv[2], 10) : 4; if (cluster.isPrimary) { // main process for (let i = 0; i < processes; i++) { const subProcess = cluster.fork(); log(`fork process, pid: ${process.pid}, sub-process pid: ${subProcess.process.pid}`); } log(`prefork, ${processes} processes`); } else { // sub process const httpServer = http.createServer((req: http.IncomingMessage, res: http.ServerResponse) => { const query = req.url?.split('?')[1]; const params = queryToMap(query); log(`sub-process accept request, pid = ${process.pid}`); const result = fibRequestHandler(params); res.writeHead(200, { 'Content-Type': 'application/json' }); res.end( JSON.stringify({ result, url: req.url, pid: process.pid, }) ); }); httpServer.listen({ host: '0.0.0.0', port: 8000, }, () => registerShutdown(httpServer)); log(`start prefork http-server, pid: ${process.pid}`); }
起動するプロセス数は、4または起動引数で指定した数にしました。
const processes = process.argv[2] !== undefined ? parseInt(process.argv[2], 10) : 4;
指定数のプロセスを、最初にfork
。
for (let i = 0; i < processes; i++) { const subProcess = cluster.fork(); log(`fork process, pid: ${process.pid}, sub-process pid: ${subProcess.process.pid}`); }
fork
すると、Node.jsを起動した時と同じ処理が呼び出されるようで(fork
呼び出したソースコードではなく)、自分がfork元か
そうでないかとisPrimary
で判定することになります。
if (cluster.isPrimary) { // main process ... } else { // sub process ... }
これだけで簡単にマルチプロセスかすることができます。
子プロセスでHTTPサーバーをそれぞれ作って同じポートをリッスンするように書いていますが、これはプロセス間で共有するらしく
問題なく動きます。
// sub process const httpServer = http.createServer((req: http.IncomingMessage, res: http.ServerResponse) => { const query = req.url?.split('?')[1]; const params = queryToMap(query); log(`sub-process accept request, pid = ${process.pid}`); const result = fibRequestHandler(params); res.writeHead(200, { 'Content-Type': 'application/json' }); res.end( JSON.stringify({ result, url: req.url, pid: process.pid, }) ); }); httpServer.listen({ host: '0.0.0.0', port: 8000, }, () => registerShutdown(httpServer));
ちょっと不思議ですね。
ビルドして起動。
$ node dist/prefork-http-server.js [2023-03-21T14:34:11.205Z] fork process, pid: 28781, sub-process pid: 28788 [2023-03-21T14:34:11.209Z] fork process, pid: 28781, sub-process pid: 28789 [2023-03-21T14:34:11.211Z] fork process, pid: 28781, sub-process pid: 28795 [2023-03-21T14:34:11.213Z] fork process, pid: 28781, sub-process pid: 28796 [2023-03-21T14:34:11.213Z] prefork, 4 processes [2023-03-21T14:34:11.251Z] start prefork http-server, pid: 28788 [2023-03-21T14:34:11.257Z] start prefork http-server, pid: 28789 [2023-03-21T14:34:11.259Z] start prefork http-server, pid: 28795 [2023-03-21T14:34:11.263Z] start prefork http-server, pid: 28796
4つの子プロセスが起動されます。ps
で確認すると、確かに増えています。
xxxxx 28781 9821 0 23:34 pts/4 00:00:00 node dist/prefork-http-server.js xxxxx 28788 28781 0 23:34 pts/4 00:00:00 /path/to/node /path/to/dist/prefork-http-server.js xxxxx 28789 28781 0 23:34 pts/4 00:00:00 /path/to/node /path/to/dist/prefork-http-server.js xxxxx 28795 28781 0 23:34 pts/4 00:00:00 /path/to/node /path/to/dist/prefork-http-server.js xxxxx 28796 28781 0 23:34 pts/4 00:00:00 /path/to/node /path/to/dist/prefork-http-server.js
先程のように、リクエストを4つ送ってみます。
$ time curl localhost:8000?num=42 $ time curl localhost:8000?num=42 $ time curl localhost:8000?num=42 $ time curl localhost:8000?num=42
リクエストを受け付けた時のログ。
[2023-03-21T14:37:23.300Z] sub-process accept request, pid = 28788 [2023-03-21T14:37:23.525Z] sub-process accept request, pid = 28789 [2023-03-21T14:37:23.741Z] sub-process accept request, pid = 28795 [2023-03-21T14:37:23.965Z] sub-process accept request, pid = 28796
それぞれ別のプロセスが受け付けたようです。
結果。
{"result":267914296,"url":"/?num=42","pid":28788} real 0m4.196s user 0m0.001s sys 0m0.008s {"result":267914296,"url":"/?num=42","pid":28789} real 0m4.265s user 0m0.006s sys 0m0.000s {"result":267914296,"url":"/?num=42","pid":28795} real 0m4.207s user 0m0.006s sys 0m0.000s {"result":267914296,"url":"/?num=42","pid":28796} real 0m4.239s user 0m0.007s sys 0m0.000s
それぞれ、ほぼ同じ処理時間で返ってくるようになりました。OKそうです。
mpstat
では、こんな感じに複数のCPUが使われていることが確認できます。
23時37分26秒 CPU %usr %nice %sys %iowait %irq %soft %steal %guest %gnice %idle 23時37分27秒 all 50.63 0.00 0.00 0.25 0.00 0.00 0.00 0.00 0.00 49.12 23時37分27秒 0 2.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 98.00 23時37分27秒 1 99.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 1.00 23時37分27秒 2 1.01 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 98.99 23時37分27秒 3 100.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 23時37分27秒 4 24.24 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 75.76 23時37分27秒 5 76.77 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 23.23 23時37分27秒 6 32.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 68.00 23時37分27秒 7 69.70 0.00 0.00 2.02 0.00 0.00 0.00 0.00 0.00 28.28
今回は4つの子プロセスで起動したので、それ以上のリクエストを同時に送信するとその場合は遅延するようになります。
こんな感じで、起動するプロセス数を絞っても確認しやすいと思います。
$ node dist/prefork-http-server.js 2 [2023-03-21T14:38:37.798Z] fork process, pid: 29087, sub-process pid: 29094 [2023-03-21T14:38:37.803Z] fork process, pid: 29087, sub-process pid: 29095 [2023-03-21T14:38:37.803Z] prefork, 2 processes [2023-03-21T14:38:37.837Z] start prefork http-server, pid: 29094 [2023-03-21T14:38:37.843Z] start prefork http-server, pid: 29095
停止。
[2023-03-21T14:38:25.955Z] shutdown, signal: SIGINT [2023-03-21T14:38:25.955Z] shutdown, signal: SIGINT [2023-03-21T14:38:25.955Z] shutdown, signal: SIGINT [2023-03-21T14:38:25.955Z] shutdown, signal: SIGINT
child_processモジュールを使う
次は、child_processモジュールを使ってみます。
今回は単純化のために、リクエストを受け付けたらfork
するようにしました。
src/fork-http-server.ts
import { fork } from 'node:child_process'; import http from 'node:http'; import { queryToMap } from './handler'; import { log } from './log'; import { registerShutdown } from './shutdown'; const httpServer = http.createServer((req: http.IncomingMessage, res: http.ServerResponse) => { const subProcess = fork(`${__dirname}/fork-handler`); subProcess.on('spawn', () => { log(`sub-process start, pid: ${subProcess.pid}`); }); const query = req.url?.split('?')[1]; const params = queryToMap(query); subProcess.send(JSON.stringify(Object.fromEntries(params))); subProcess.on('message', (result) => { res.writeHead(200, { 'Content-Type': 'application/json' }); res.end( JSON.stringify({ result, url: req.url, pid: subProcess.pid, }) ); res.on('close', () => { subProcess.kill(); }); }); subProcess.on('exit', (code, signal) => { log(`sub-process exit, pid: ${subProcess.pid}, code: ${code}, signal: ${signal}`); }); }); httpServer.listen({ host: '0.0.0.0', port: 8000, }); log(`start fork http-server, pid: ${process.pid}`); registerShutdown(httpServer);
リクエストを受け付けたら、まずはfork
します。
const httpServer = http.createServer((req: http.IncomingMessage, res: http.ServerResponse) => { const subProcess = fork(`${__dirname}/fork-handler`);
子プロセスに、IPCで処理を行うためのメッセージを送信。
subProcess.send(JSON.stringify(Object.fromEntries(params)));
子プロセスが計算した値を、レスポンスとして返します。
subProcess.on('message', (result) => { res.writeHead(200, { 'Content-Type': 'application/json' }); res.end( JSON.stringify({ result, url: req.url, pid: subProcess.pid, }) );
レスポンスの送信が完了したら、子プロセスを終了。
res.on('close', () => { subProcess.kill(); }); });
こんな感じで、計算処理の部分は子プロセスで、リクエストやレスポンスの読み出しなどは親プロセスで行うようにしました。
また、fork
で起動するモジュールでは親プロセスからメッセージを受け取り、メッセージを返す必要があるので以下のような
モジュールを作成しました。
src/fork-handler.ts
import { fibRequestHandler } from './handler'; import { log } from './log'; process.on('message', (paramsAsString: string) => { log(`sub-process accept request, pid = ${process.pid}`); const params = new Map<string, string>(Object.entries(JSON.parse(paramsAsString))); const result = fibRequestHandler(params); process.send!({ result }); });
プロセス間のメッセージの送信はsend
で、
- Process / process.send(message[, sendHandle[, options]][, callback])
- Child process / subprocess.send(message[, sendHandle[, options]][, callback])
受信はon('message', handler)
で行います。
わかりにくいですが、以下では親プロセスからメッセージを受け取り、処理が終わったら親プロセスにメッセージを送信しています。
process.on('message', (paramsAsString: string) => { log(`sub-process accept request, pid = ${process.pid}`); const params = new Map<string, string>(Object.entries(JSON.parse(paramsAsString))); const result = fibRequestHandler(params); process.send!({ result }); });
親プロセスから子プロセスに情報を渡し、子プロセスから結果を受け取っているのは以下の部分ですね。
subProcess.send(JSON.stringify(Object.fromEntries(params))); subProcess.on('message', (result) => { res.writeHead(200, { 'Content-Type': 'application/json' }); res.end( JSON.stringify({ result, url: req.url, pid: subProcess.pid, }) );
起動。
$ node dist/fork-http-server.js [2023-03-21T14:39:00.351Z] start fork http-server, pid: 29124
リクエストを1つ送ってみます。
$ time curl localhost:8000?num=42
ログには、こんな感じでfork
されて計算処理を行い、終わったらプロセスを終了させていることを確認できます。
[2023-03-21T14:39:37.740Z] sub-process start, pid: 29148 [2023-03-21T14:39:37.773Z] sub-process accept request, pid = 29148 [2023-03-21T14:39:41.418Z] sub-process exit, pid: 29148, code: null, signal: SIGTERM
では、リクエストを4つ送ってみます。
$ time curl localhost:8000?num=42 $ time curl localhost:8000?num=42 $ time curl localhost:8000?num=42 $ time curl localhost:8000?num=42
結果。
{"result":{"result":267914296},"url":"/?num=42","pid":29192} real 0m4.192s user 0m0.005s sys 0m0.004s {"result":{"result":267914296},"url":"/?num=42","pid":29201} real 0m4.261s user 0m0.004s sys 0m0.002s {"result":{"result":267914296},"url":"/?num=42","pid":29210} real 0m4.251s user 0m0.003s sys 0m0.003s {"result":{"result":267914296},"url":"/?num=42","pid":29219} real 0m4.209s user 0m0.003s sys 0m0.004s
リクエストを受け付けてから、子プロセスが終了するまでのログ。
[2023-03-21T14:40:43.341Z] sub-process start, pid: 29192 [2023-03-21T14:40:43.377Z] sub-process accept request, pid = 29192 [2023-03-21T14:40:43.888Z] sub-process start, pid: 29201 [2023-03-21T14:40:43.923Z] sub-process accept request, pid = 29201 [2023-03-21T14:40:44.216Z] sub-process start, pid: 29210 [2023-03-21T14:40:44.251Z] sub-process accept request, pid = 29210 [2023-03-21T14:40:44.520Z] sub-process start, pid: 29219 [2023-03-21T14:40:44.559Z] sub-process accept request, pid = 29219 [2023-03-21T14:40:47.524Z] sub-process exit, pid: 29192, code: null, signal: SIGTERM [2023-03-21T14:40:48.144Z] sub-process exit, pid: 29201, code: null, signal: SIGTERM [2023-03-21T14:40:48.462Z] sub-process exit, pid: 29210, code: null, signal: SIGTERM [2023-03-21T14:40:48.723Z] sub-process exit, pid: 29219, code: null, signal: SIGTERM
CPUが複数使われていることを確認。
23時40分46秒 CPU %usr %nice %sys %iowait %irq %soft %steal %guest %gnice %idle 23時40分47秒 all 50.63 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 49.37 23時40分47秒 0 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 100.00 23時40分47秒 1 100.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 23時40分47秒 2 100.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 23時40分47秒 3 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 100.00 23時40分47秒 4 100.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 23時40分47秒 5 1.01 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 98.99 23時40分47秒 6 100.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 23時40分47秒 7 1.01 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 98.99
なお、リクエストを処理している間のプロセスを見ると、以下のようになっています。
xxxxx 29124 9821 0 23:39 pts/4 00:00:00 node dist/fork-http-server.js xxxxx 29301 29124 73 23:42 pts/4 00:00:02 /path/to/node /path/to/dist/fork-handler xxxxx 29310 29124 97 23:42 pts/4 00:00:01 /path/to/node /path/to/dist/fork-handler xxxxx 29319 29124 84 23:42 pts/4 00:00:01 /path/to/node /path/to/dist/fork-handler xxxxx 29328 29124 73 23:42 pts/4 00:00:01 /path/to/node /path/to/dist/fork-handler
リクエストが終了次第、fork
した子プロセスは終了していきます。
終了。
[2023-03-21T14:45:49.815Z] shutdown, signal: SIGINT
worker_threadsモジュールを使う
最後は、worker_threadsモジュールを使ってマルチスレッドで動作するようにしましょう。
こちらも、簡単のためにリクエストを受け付けたら新しくスレッドを起動するようにしています。
src/threaded-http-server.ts
import http from 'node:http'; import { Worker, isMainThread, threadId } from 'node:worker_threads'; import { queryToMap } from './handler'; import { log } from './log'; import { registerShutdown } from './shutdown'; log(`main-thread, pid = ${process.pid}, thread-id = ${threadId}, is-main-thread: ${isMainThread}`); const httpServer = http.createServer((req: http.IncomingMessage, res: http.ServerResponse) => { const query = req.url?.split('?')[1]; const params = queryToMap(query); const worker = new Worker(`${__dirname}/worker-handler`, { workerData: params }); worker.on('online', () => { log(`worker-thread start, pid: ${process.pid}, thread-id: ${worker.threadId}, is-main-thread: ${isMainThread}`); }); worker.on('message', (result) => { res.writeHead(200, { 'Content-Type': 'application/json' }); res.end( JSON.stringify({ result, url: req.url, pid: process.pid, threadId: worker.threadId, }) ); }); worker.on('exit', () => { log(`worker-thread exit, pid: ${process.pid}, thread-id: ${worker.threadId}, is-main-thread: ${isMainThread}`); }); }); httpServer.listen( { host: '0.0.0.0', port: 8000, }, () => registerShutdown(httpServer) ); log(`start threaded http-server, pid: ${process.pid}, thread-id: ${threadId}, is-main-thread: ${isMainThread}`);
スレッドは、Worker
に対象のモジュールを指定することで起動します。
const worker = new Worker(`${__dirname}/worker-handler`, { workerData: params });
この時に、スレッドに渡すデータも指定できます。
コンストラクタ以外でも、postMessage
でも渡せそうですね。
Worker threads / new Worker(filename[, options]) / worker.postMessage(value[, transferList])
計算処理は起動したスレッドで行い、メッセージとして受け取ります。
worker.on('message', (result) => { res.writeHead(200, { 'Content-Type': 'application/json' }); res.end( JSON.stringify({ result, url: req.url, pid: process.pid, threadId: worker.threadId, }) ); });
この時にスレッドのidも返すようにしました。
この値は、プラットフォームのスレッドのidとは関係がなさそうです。
Worker threads / worker.threadId
また、現在のスレッドがメインスレッドかどうかはisMainThread
で判定することができます。今回はログ出力に使っています。
Worker threads / worker.isMainThread
isMainThread
がtrue
を返す場合は、Worker
内で動作していないことを示します。
起動したスレッドとメッセージの送受信をする必要があるので、child_processモジュールを使った時と同じようにモジュールを用意しました。
src/worker-handler.ts
import { isMainThread, parentPort, threadId, workerData } from 'node:worker_threads'; import { fibRequestHandler } from './handler'; import { log } from './log'; log(`worker-thread accept request, pid = ${process.pid}, thread-id = ${threadId}, is-main-thread: ${isMainThread}`); const params: Map<string, string> = workerData; const result = fibRequestHandler(workerData); parentPort?.postMessage(result);
Worker
作成時に渡したデータは、workerData
で参照できます。
Worker threads / worker.workerData
そして、親のスレッドへデータを渡すには、parentPort
経由でpostMessage
を使うことで行えます。
- Worker threads / worker.parentPort
- Worker threads / Class: MessagePort / port.postMessage(value[, transferList])
こちらを使うことで、起動元・先のスレッド間でデータのやり取りを行うことができます。
起動。
$ node dist/threaded-http-server.js [2023-03-21T15:13:59.056Z] main-thread, pid = 30736, thread-id = 0, is-main-thread: true [2023-03-21T15:13:59.061Z] start threaded http-server, pid: 30736, thread-id: 0, is-main-thread: true
リクエストを1つ送ってみます。
$ time curl localhost:8000?num=42 {"result":267914296,"url":"/?num=42","pid":30736,"threadId":1} real 0m3.793s user 0m0.003s sys 0m0.008s
ログには、こんな感じで別スレッドで計算処理を行い、終わったらスレッドを終了させていることを確認できます。
[2023-03-21T15:14:38.375Z] worker-thread start, pid: 30736, thread-id: 1, is-main-thread: true [2023-03-21T15:14:38.380Z] worker-thread accept request, pid = 30736, thread-id = 1, is-main-thread: false [2023-03-21T15:14:42.099Z] worker-thread exit, pid: 30736, thread-id: -1, is-main-thread: true
スレッド終了のイベントハンドリングの時には、スレッドのidがわからなくなるみたいですね。
では、リクエストを4つ送ってみます。
$ time curl localhost:8000?num=42 $ time curl localhost:8000?num=42 $ time curl localhost:8000?num=42 $ time curl localhost:8000?num=42
結果。
{"result":267914296,"url":"/?num=42","pid":30736,"threadId":2} real 0m4.369s user 0m0.003s sys 0m0.007s {"result":267914296,"url":"/?num=42","pid":30736,"threadId":3} real 0m4.356s user 0m0.000s sys 0m0.006s {"result":267914296,"url":"/?num=42","pid":30736,"threadId":4} real 0m4.365s user 0m0.007s sys 0m0.000s {"result":267914296,"url":"/?num=42","pid":30736,"threadId":5} real 0m4.369s user 0m0.007s sys 0m0.000s
複数のリクエストを送信しても、それほど劣化せずそれぞれのリクエストを処理できています。
リクエストを受け付けてから、子プロセスが終了するまでのログ。
[2023-03-21T15:15:30.236Z] worker-thread start, pid: 30736, thread-id: 2, is-main-thread: true [2023-03-21T15:15:30.241Z] worker-thread accept request, pid = 30736, thread-id = 2, is-main-thread: false [2023-03-21T15:15:30.492Z] worker-thread start, pid: 30736, thread-id: 3, is-main-thread: true [2023-03-21T15:15:30.497Z] worker-thread accept request, pid = 30736, thread-id = 3, is-main-thread: false [2023-03-21T15:15:30.711Z] worker-thread start, pid: 30736, thread-id: 4, is-main-thread: true [2023-03-21T15:15:30.718Z] worker-thread accept request, pid = 30736, thread-id = 4, is-main-thread: false [2023-03-21T15:15:30.936Z] worker-thread start, pid: 30736, thread-id: 5, is-main-thread: true [2023-03-21T15:15:30.941Z] worker-thread accept request, pid = 30736, thread-id = 5, is-main-thread: false [2023-03-21T15:15:34.540Z] worker-thread exit, pid: 30736, thread-id: -1, is-main-thread: true [2023-03-21T15:15:34.795Z] worker-thread exit, pid: 30736, thread-id: -1, is-main-thread: true [2023-03-21T15:15:35.021Z] worker-thread exit, pid: 30736, thread-id: -1, is-main-thread: true [2023-03-21T15:15:35.240Z] worker-thread exit, pid: 30736, thread-id: -1, is-main-thread: true
CPUが複数使われていることを確認。
00時15分30秒 CPU %usr %nice %sys %iowait %irq %soft %steal %guest %gnice %idle 00時15分31秒 all 53.63 0.00 1.00 0.00 0.00 0.00 0.00 0.00 0.00 45.36 00時15分31秒 0 100.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 00時15分31秒 1 5.00 0.00 2.00 0.00 0.00 0.00 0.00 0.00 0.00 93.00 00時15分31秒 2 22.00 0.00 1.00 0.00 0.00 0.00 0.00 0.00 0.00 77.00 00時15分31秒 3 80.20 0.00 1.98 0.00 0.00 0.00 0.00 0.00 0.00 17.82 00時15分31秒 4 100.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 00時15分31秒 5 13.13 0.00 3.03 0.00 0.00 0.00 0.00 0.00 0.00 83.84 00時15分31秒 6 7.14 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 92.86 00時15分31秒 7 100.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00
なお、リクエストを処理している間のスレッドの様子はこんな感じでちょっと多めですが、処理が終了するとスレッドはちゃんと
減ります。Node.jsがいくつかスレッドを管理していそうな感じですね。
$ ps aux -L | grep node | grep dist xxxxx 31040 31040 0.2 11 0.5 1430956 93000 pts/4 Sl+ 00:19 0:00 node dist/threaded-http-server.js xxxxx 31040 31041 0.0 11 0.5 1430956 93000 pts/4 Sl+ 00:19 0:00 node dist/threaded-http-server.js xxxxx 31040 31042 0.0 11 0.5 1430956 93000 pts/4 Sl+ 00:19 0:00 node dist/threaded-http-server.js xxxxx 31040 31043 0.0 11 0.5 1430956 93000 pts/4 Sl+ 00:19 0:00 node dist/threaded-http-server.js xxxxx 31040 31044 0.0 11 0.5 1430956 93000 pts/4 Sl+ 00:19 0:00 node dist/threaded-http-server.js xxxxx 31040 31045 0.0 11 0.5 1430956 93000 pts/4 Sl+ 00:19 0:00 node dist/threaded-http-server.js xxxxx 31040 31046 0.0 11 0.5 1430956 93000 pts/4 Sl+ 00:19 0:00 node dist/threaded-http-server.js xxxxx 31040 31117 112 11 0.5 1430956 93000 pts/4 Rl+ 00:19 0:02 node dist/threaded-http-server.js xxxxx 31040 31120 184 11 0.5 1430956 93000 pts/4 Rl+ 00:19 0:01 node dist/threaded-http-server.js xxxxx 31040 31123 143 11 0.5 1430956 93000 pts/4 Rl+ 00:19 0:01 node dist/threaded-http-server.js xxxxx 31040 31126 101 11 0.5 1430956 93000 pts/4 Rl+ 00:19 0:01 node dist/threaded-http-server.js
ざっくり、こんなところでしょうか。
オマケ
ちょっとしたオマケとして、clusterモジュールを使って親プロセスでHTTPサーバーを起動した場合はどうしたらいいのかな?というのを
試行してみました。
src/prefork-http-server2.ts
import cluster, { Worker } from 'node:cluster'; import http from 'node:http'; import { queryToMap } from './handler'; import { log } from './log'; import { registerShutdown } from './shutdown'; const processes = process.argv[2] !== undefined ? parseInt(process.argv[2], 10) : 4; cluster.setupPrimary({ exec: `${__dirname}/fork-handler` }); const subProcesses: Worker[] = []; for (let i = 0; i < processes; i++) { const subProcess = cluster.fork(); log(`fork process, pid: ${subProcess.process.pid}`); subProcesses.push(subProcess); } log(`prefork, ${processes} processes`); let index = 0; const httpServer = http.createServer((req: http.IncomingMessage, res: http.ServerResponse) => { const query = req.url?.split('?')[1]; const params = queryToMap(query); if (index >= subProcesses.length) { index = 0; } const subProcess = subProcesses[index++]; subProcess.send(JSON.stringify(Object.fromEntries(params))); subProcess.on('message', (result) => { res.writeHead(200, { 'Content-Type': 'application/json' }); res.end( JSON.stringify({ result, url: req.url, pid: subProcess.process.pid, }) ); }); }); httpServer.listen( { host: '0.0.0.0', port: 8000, }, () => registerShutdown(httpServer) ); log(`start prefork http-server, pid: ${process.pid}`);
最初に子プロセスのプールを作って、リクエストが来ると子プロセスにメッセージ送受信する感じになりました。
こちらも簡易的なものですが。
まとめ
Node.jsでマルチプロセス、マルチスレッドを扱う方法をいくつか調べてみました。
これまでNode.jsではこのあたりの話題に触れてこなかったので、いろいろ大変でしたがいい勉強になりました。
アプリケーション内でのプロセス起動、スレッド起動、どちらも使えるんですね。