CLOVER🍀

That was when it all began.

Node.jsでマルチプロセス(cluster、child_process)、マルチスレッド(worker_threads)を使ってみる

これは、なにをしたくて書いたもの?

Node.jsでマルチプロセス、それからNode.js 10.5.0以降であればマルチスレッドが使えるようなので、興味がてら試してみることに
しました。

Node.jsでマルチプロセス、マルチスレッド

Node.jsでマルチプロセスを扱うには、clusterモジュールまたはchild_processモジュールを使うようです。

Cluster | Node.js v18.15.0 Documentation

Child process | Node.js v18.15.0 Documentation

clusterモジュールは、内部的にはchild_processモジュールを使っているようです。

The worker processes are spawned using the child_process.fork() method, so that they can communicate with the parent via IPC and pass server handles back and forth.

Cluster / How it works

マルチスレッドを扱うには、worker_threadsモジュールを使うようです。こちらは、Node.js 10.5.0から利用可能になったものですね。

Worker threads | Node.js v18.15.0 Documentation

child_processモジュール

最初はマルチプロセスから、そしてclusterモジュールはchild_processモジュールをベースにしているようなので、
まずはchild_processモジュールを見てみましょう。

child_processモジュールは、子プロセスを生成するモジュールです。

The node:child_process module provides the ability to spawn subprocesses in a manner that is similar, but not identical, to popen(3). This capability is primarily provided by the child_process.spawn() function:

Child process

使うメソッドはいくつかありますが、ざっくり以下の3つの系統に分かれます。

  • spawn … 指定されたコマンドを実行する。引数は配列で指定
  • exec … OSのシェルを介して指定されたコマンドを実行する。引数はスペース区切りで指定
  • fork … 実行するモジュールを指定して、新しいNode.jsプロセスを生成する。親プロセスと子プロセスの間はIPC(プロセス間通信)ができる準備が整えられている

上記はサブプロセスが非同期に実行されるのですが、spawnとexecについては同期版もあります。

forkで作成した子プロセスは、IPC(Internal Process Communication、プロセス間通信)でメッセージを送信し合うことができます。

Child process / subprocess.send(message[, sendHandle[, options]][, callback])

メッセージはシリアライズして送信することになります。デフォルトではJSON型式の文字列でシリアライズします。

serialization Specify the kind of serialization used for sending messages between processes. Possible values are 'json' and 'advanced'. See Advanced serialization for more details. Default: 'json'.

Child process / child_process.fork(modulePath[, args][, options])

また、ちょっと変わったケースとしてTCPServerオブジェクトやSocketオブジェクトを直接渡すこともできます。

マルチプロセスなTCPサーバーを作る用途が意識されてそうな感じですね。

clusterモジュール

clusterモジュールは、child_processモジュールのforkを使って実現されるものです。つまり、新しいNode.jsプロセスを生成することに
なります。

The worker processes are spawned using the child_process.fork() method, so that they can communicate with the parent via IPC and pass server handles back and forth.

Cluster / How it works

よってIPCを使って子プロセスとやり取りすることができます。

以下のように、ネットワーク接続を受け付けるようなアプリケーションで使われることを意識していますね。

The cluster module supports two methods of distributing incoming connections.

The first one (and the default one on all platforms except Windows) is the round-robin approach, where the primary process listens on a port, accepts new connections and distributes them across the workers in a round-robin fashion, with some built-in smarts to avoid overloading a worker process.

Although a primary use case for the node:cluster module is networking, it can also be used for other use cases requiring worker processes.

生成されたプロセス自体を管理するのはアプリケーション側の役割で、サブプロセスの数の管理をNode.jsで行うようなことはありません。

Node.js does not automatically manage the number of workers, however. It is the application's responsibility to manage the worker pool based on its own needs.

また、サンプルに書かれているようにサーバーが利用するポートをプロセス間で簡単に共有することができます。

import cluster from 'node:cluster';
import http from 'node:http';
import { availableParallelism } from 'node:os';
import process from 'node:process';

const numCPUs = availableParallelism();

if (cluster.isPrimary) {
  console.log(`Primary ${process.pid} is running`);

  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`worker ${worker.process.pid} died`);
  });
} else {
  // Workers can share any TCP connection
  // In this case it is an HTTP server
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');
  }).listen(8000);

  console.log(`Worker ${process.pid} started`);
}

こんな感じで子プロセス側で同じポートをリッスンするServerを作成しているのですが、これで同じポートをリッスンする複数プロセスの
Node.jsアプリケーションが動作するんですよね。

  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');
  }).listen(8000);
worker_threadsモジュール

Node.jsでマルチスレッドを扱うには、worker_threadsモジュールを使います。

Worker threads | Node.js v18.15.0 Documentation

ワーカースレッドは、CPUバウンドな処理を実行するのに役立ち、IOバウンドな処理には効果的ではないと書かれています。IO操作に
ついては、Node.jsの非同期IOの方がワーカースレッドを使うよりも効率的だそうです。

Workers (threads) are useful for performing CPU-intensive JavaScript operations. They do not help much with I/O-intensive work. The Node.js built-in asynchronous I/O operations are more efficient than Workers can be.

Worker threads

またclusterモジュールやchild_processモジュールと異なり、worker_threadsモジュールではワーカースレッドとメモリを共有できます。

Unlike child_process or cluster, worker_threads can share memory. They do so by transferring ArrayBuffer instances or sharing SharedArrayBuffer instances.

なお、スレッドプールの機能はありません。AsyncResourceを使って実装しているサンプルがあるよ、と書かれています。

When implementing a worker pool, use the AsyncResource API to inform diagnostic tools (e.g. to provide asynchronous stack traces) about the correlation between tasks and their outcomes. See "Using AsyncResource for a Worker thread pool" in the async_hooks documentation for an example implementation.

Worker threads

こちらですね。

Asynchronous context tracking / Class: AsyncResource / Using AsyncResource for a Worker thread pool

要するに自分で作ってね、と…。

お題

マルチプロセスを使っても、マルチスレッドを使っても、CPU数でスケールすることを確認できればよいので、今回はフィボナッチ数の
計算を行うHTTPサーバーをそれぞれのモジュールを使って試してみたいと思います。

HTTPサーバー自体は、こちらを使って作成します。

HTTP | Node.js v18.15.0 Documentation

また、プログラムはTypeScriptで書くことにします。

環境

今回の環境は、こちら。

$ node --version
v18.15.0


$ npm --version
9.5.0

準備

Node.js+TypeScriptの環境を作成します。

$ npm init -y
$ npm i -D typescript
$ npm i -D @types/node@v18
$ npm i -D prettier
$ mkdir src

依存関係はこんな感じになりました。

  "devDependencies": {
    "@types/node": "^18.15.3",
    "prettier": "^2.8.5",
    "typescript": "^5.0.2"
  }

scripts

  "scripts": {
    "build": "tsc --project .",
    "build:watch": "tsc --project . --watch",
    "format": "prettier --write src"
  },

設定ファイル。

tsconfig.json

{
  "compilerOptions": {
    "target": "esnext",
    "module": "commonjs",
    "moduleResolution": "node",
    "lib": ["esnext"],
    "baseUrl": "./src",
    "outDir": "dist",
    "strict": true,
    "forceConsistentCasingInFileNames": true,
    "noFallthroughCasesInSwitch": true,
    "noImplicitOverride": true,
    "noImplicitReturns": true,
    "noPropertyAccessFromIndexSignature": true,
    "esModuleInterop": true
  },
  "include": [
    "src"
  ]
}

.prettierrc.json

{
  "singleQuote": true,
  "printWidth": 120
}

まずはシンプルに

まずはマルチプロセスもマルチスレッドも使わないバージョンを作成します。

この後、種々のモジュールを使って作成するプログラムでも扱いやすいように、いくつかのファイルに分割して書いていきます。

フィボナッチ数の計算は、http://localhost:8000?num=[整数]のような指定で行うものとします。

フィボナッチ数の計算を行う関数。

src/fib.ts

export function fib(n: number): number {
  if (n === 0) {
    return 0;
  } else if (n === 1) {
    return 1;
  } else {
    return fib(n - 1) + fib(n - 2);
  }
}

QueryStringをMapに分解したり、その内容をfib関数に渡すための関数。

src/handler.ts

import { fib } from './fib';

export function fibRequestHandler(params: Map<string, string>): number | undefined {
  let result;

  if (params.has('num')) {
    result = fib(parseInt(params.get('num')!, 10));
  } else {
    result = undefined;
  }

  return result;
}

export function queryToMap(query: string | undefined): Map<string, string> {
  const map = new Map<string, string>();

  if (!query) {
    return map;
  }

  for (const pair of query.split('&')) {
    const [name, value] = pair.split('=');
    map.set(name, value);
  }

  return map;
}

ログ出力。

src/log.ts

export function log(message: string): void {
  console.log(`[${new Date().toISOString()}] ${message}`);
}

シャットダウン。

src/shutdown.ts

import http from 'node:http';
import { log } from './log';

export function registerShutdown(httpServer: http.Server): void {
  process.on('SIGTERM', signalHandler(httpServer));
  process.on('SIGINT', signalHandler(httpServer));
}

function signalHandler(httpServer: http.Server): (signal: NodeJS.Signals) => void {
  return (signal) => {
    httpServer.close(() => log(`shutdown, signal: ${signal}`));
  };
}

ここまでが基本的な部品ですね。

では、サーバー部分を書きます。

src/simple-http-server.ts

import http from 'node:http';
import { fibRequestHandler, queryToMap } from './handler';
import { log } from './log';
import { registerShutdown } from './shutdown';

const httpServer = http.createServer((req: http.IncomingMessage, res: http.ServerResponse) => {
  const query = req.url?.split('?')[1];
  const params = queryToMap(query);

  log(`accept request, pid = ${process.pid}`);

  const result = fibRequestHandler(params);

  res.writeHead(200, { 'Content-Type': 'application/json' });
  res.end(
    JSON.stringify({
      result,
      url: req.url,
      pid: process.pid,
    })
  );
});

httpServer.listen(
  {
    host: '0.0.0.0',
    port: 8000,
  },
  () => registerShutdown(httpServer)
);

log(`http-server, pid: ${process.pid}`);

ビルドして

$ npm run build

起動。

$ node dist/simple-http-server.js
[2023-03-21T13:59:12.290Z] http-server, pid: 27141

確認。

$ curl localhost:8000?num=10
{"result":55,"url":"/?num=10","pid":27141}

こんな感じで返ってきます。

手元の環境では、42くらいがちょうどいい時間がかかりました。

$ time curl localhost:8000?num=42
{"result":267914296,"url":"/?num=42","pid":27141}
real    0m3.698s
user    0m0.009s
sys     0m0.003s

この時、mpstatで見ると

$ mpstat -P ALL 1

CPUをひとつ使い切っていることがわかります。

23時02分08秒  CPU    %usr   %nice    %sys %iowait    %irq   %soft  %steal  %guest  %gnice   %idle
23時02分09秒  all   13.32    0.00    0.00    0.13    0.00    0.00    0.00    0.00    0.00   86.56
23時02分09秒    0    1.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00   99.00
23時02分09秒    1  100.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00
23時02分09秒    2    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00  100.00
23時02分09秒    3    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00  100.00
23時02分09秒    4    3.03    0.00    0.00    1.01    0.00    0.00    0.00    0.00    0.00   95.96
23時02分09秒    5    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00  100.00
23時02分09秒    6    1.98    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00   98.02
23時02分09秒    7    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00  100.00

では、これを連続で4回実行してみます。

$ time curl localhost:8000?num=42

$ time curl localhost:8000?num=42

$ time curl localhost:8000?num=42

$ time curl localhost:8000?num=42

リクエストを受け付けた時のログ。

[2023-03-21T14:01:04.742Z] accept request, pid = 27141
[2023-03-21T14:02:06.498Z] accept request, pid = 27141
[2023-03-21T14:03:17.756Z] accept request, pid = 27141
[2023-03-21T14:03:21.562Z] accept request, pid = 27141

結果。

$ {"result":267914296,"url":"/?num=42","pid":27141}
real    0m3.814s
user    0m0.004s
sys     0m0.005s


{"result":267914296,"url":"/?num=42","pid":27141}
real    0m7.285s
user    0m0.000s
sys     0m0.006s


{"result":267914296,"url":"/?num=42","pid":27141}
real    0m10.876s
user    0m0.007s
sys     0m0.000s


{"result":267914296,"url":"/?num=42","pid":27141}
real    0m15.284s
user    0m0.004s
sys     0m0.004s

ほぼ線形に遅くなっています。

使えるCPUコアはひとつだからですね。

23時04分25秒  CPU    %usr   %nice    %sys %iowait    %irq   %soft  %steal  %guest  %gnice   %idle
23時04分26秒  all   12.96    0.00    0.13    0.00    0.00    0.00    0.00    0.00    0.00   86.92
23時04分26秒    0  100.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00
23時04分26秒    1    1.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00   99.00
23時04分26秒    2    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00  100.00
23時04分26秒    3    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00  100.00
23時04分26秒    4    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00  100.00
23時04分26秒    5    1.01    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00   98.99
23時04分26秒    6    1.01    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00   98.99
23時04分26秒    7    0.00    0.00    0.99    0.00    0.00    0.00    0.00    0.00    0.00   99.01

Ctrl-cで停止。

[2023-03-21T14:05:04.163Z] shutdown, signal: SIGINT

これを、マルチプロセスやマルチスレッドを使って変えていきましょう。

また、以降はTypeScriptソースコードをビルドするステップは省略します。

clusterモジュールで書く

最初はclusterモジュールを使って書いてみます。

src/prefork-http-server.ts

import cluster from 'node:cluster';
import http from 'node:http';
import { fibRequestHandler, queryToMap } from './handler';
import { log } from './log';
import { registerShutdown } from './shutdown';

const processes = process.argv[2] !== undefined ? parseInt(process.argv[2], 10) : 4;

if (cluster.isPrimary) {
  // main process
  for (let i = 0; i < processes; i++) {
    const subProcess = cluster.fork();
    log(`fork process, pid: ${process.pid}, sub-process pid: ${subProcess.process.pid}`);
  }

  log(`prefork, ${processes} processes`);
} else {
  // sub process
  const httpServer = http.createServer((req: http.IncomingMessage, res: http.ServerResponse) => {
    const query = req.url?.split('?')[1];
    const params = queryToMap(query);

    log(`sub-process accept request, pid = ${process.pid}`);

    const result = fibRequestHandler(params);

    res.writeHead(200, { 'Content-Type': 'application/json' });
    res.end(
      JSON.stringify({
        result,
        url: req.url,
        pid: process.pid,
      })
    );
  });

  httpServer.listen({
    host: '0.0.0.0',
    port: 8000,
  }, () => registerShutdown(httpServer));

  log(`start prefork http-server, pid: ${process.pid}`);
}

起動するプロセス数は、4または起動引数で指定した数にしました。

const processes = process.argv[2] !== undefined ? parseInt(process.argv[2], 10) : 4;

指定数のプロセスを、最初にfork

  for (let i = 0; i < processes; i++) {
    const subProcess = cluster.fork();
    log(`fork process, pid: ${process.pid}, sub-process pid: ${subProcess.process.pid}`);
  }

forkすると、Node.jsを起動した時と同じ処理が呼び出されるようで(fork呼び出したソースコードではなく)、自分がfork元か
そうでないかとisPrimaryで判定することになります。

if (cluster.isPrimary) {
  // main process
  
  ...
} else {
  // sub process

  ...
}

Cluster / cluster.isPrimary

これだけで簡単にマルチプロセスかすることができます。

子プロセスでHTTPサーバーをそれぞれ作って同じポートをリッスンするように書いていますが、これはプロセス間で共有するらしく
問題なく動きます。

  // sub process
  const httpServer = http.createServer((req: http.IncomingMessage, res: http.ServerResponse) => {
    const query = req.url?.split('?')[1];
    const params = queryToMap(query);

    log(`sub-process accept request, pid = ${process.pid}`);

    const result = fibRequestHandler(params);

    res.writeHead(200, { 'Content-Type': 'application/json' });
    res.end(
      JSON.stringify({
        result,
        url: req.url,
        pid: process.pid,
      })
    );
  });

  httpServer.listen({
    host: '0.0.0.0',
    port: 8000,
  }, () => registerShutdown(httpServer));

ちょっと不思議ですね。

ビルドして起動。

$ node dist/prefork-http-server.js
[2023-03-21T14:34:11.205Z] fork process, pid: 28781, sub-process pid: 28788
[2023-03-21T14:34:11.209Z] fork process, pid: 28781, sub-process pid: 28789
[2023-03-21T14:34:11.211Z] fork process, pid: 28781, sub-process pid: 28795
[2023-03-21T14:34:11.213Z] fork process, pid: 28781, sub-process pid: 28796
[2023-03-21T14:34:11.213Z] prefork, 4 processes
[2023-03-21T14:34:11.251Z] start prefork http-server, pid: 28788
[2023-03-21T14:34:11.257Z] start prefork http-server, pid: 28789
[2023-03-21T14:34:11.259Z] start prefork http-server, pid: 28795
[2023-03-21T14:34:11.263Z] start prefork http-server, pid: 28796

4つの子プロセスが起動されます。psで確認すると、確かに増えています。

xxxxx   28781    9821  0 23:34 pts/4    00:00:00 node dist/prefork-http-server.js
xxxxx   28788   28781  0 23:34 pts/4    00:00:00 /path/to/node /path/to/dist/prefork-http-server.js
xxxxx   28789   28781  0 23:34 pts/4    00:00:00 /path/to/node /path/to/dist/prefork-http-server.js
xxxxx   28795   28781  0 23:34 pts/4    00:00:00 /path/to/node /path/to/dist/prefork-http-server.js
xxxxx   28796   28781  0 23:34 pts/4    00:00:00 /path/to/node /path/to/dist/prefork-http-server.js

先程のように、リクエストを4つ送ってみます。

$ time curl localhost:8000?num=42

$ time curl localhost:8000?num=42

$ time curl localhost:8000?num=42

$ time curl localhost:8000?num=42

リクエストを受け付けた時のログ。

[2023-03-21T14:37:23.300Z] sub-process accept request, pid = 28788
[2023-03-21T14:37:23.525Z] sub-process accept request, pid = 28789
[2023-03-21T14:37:23.741Z] sub-process accept request, pid = 28795
[2023-03-21T14:37:23.965Z] sub-process accept request, pid = 28796

それぞれ別のプロセスが受け付けたようです。

結果。

{"result":267914296,"url":"/?num=42","pid":28788}
real    0m4.196s
user    0m0.001s
sys     0m0.008s


{"result":267914296,"url":"/?num=42","pid":28789}
real    0m4.265s
user    0m0.006s
sys     0m0.000s


{"result":267914296,"url":"/?num=42","pid":28795}
real    0m4.207s
user    0m0.006s
sys     0m0.000s


{"result":267914296,"url":"/?num=42","pid":28796}
real    0m4.239s
user    0m0.007s
sys     0m0.000s

それぞれ、ほぼ同じ処理時間で返ってくるようになりました。OKそうです。

mpstatでは、こんな感じに複数のCPUが使われていることが確認できます。

23時37分26秒  CPU    %usr   %nice    %sys %iowait    %irq   %soft  %steal  %guest  %gnice   %idle
23時37分27秒  all   50.63    0.00    0.00    0.25    0.00    0.00    0.00    0.00    0.00   49.12
23時37分27秒    0    2.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00   98.00
23時37分27秒    1   99.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    1.00
23時37分27秒    2    1.01    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00   98.99
23時37分27秒    3  100.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00
23時37分27秒    4   24.24    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00   75.76
23時37分27秒    5   76.77    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00   23.23
23時37分27秒    6   32.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00   68.00
23時37分27秒    7   69.70    0.00    0.00    2.02    0.00    0.00    0.00    0.00    0.00   28.28

今回は4つの子プロセスで起動したので、それ以上のリクエストを同時に送信するとその場合は遅延するようになります。

こんな感じで、起動するプロセス数を絞っても確認しやすいと思います。

$ node dist/prefork-http-server.js 2
[2023-03-21T14:38:37.798Z] fork process, pid: 29087, sub-process pid: 29094
[2023-03-21T14:38:37.803Z] fork process, pid: 29087, sub-process pid: 29095
[2023-03-21T14:38:37.803Z] prefork, 2 processes
[2023-03-21T14:38:37.837Z] start prefork http-server, pid: 29094
[2023-03-21T14:38:37.843Z] start prefork http-server, pid: 29095

停止。

[2023-03-21T14:38:25.955Z] shutdown, signal: SIGINT
[2023-03-21T14:38:25.955Z] shutdown, signal: SIGINT
[2023-03-21T14:38:25.955Z] shutdown, signal: SIGINT
[2023-03-21T14:38:25.955Z] shutdown, signal: SIGINT

child_processモジュールを使う

次は、child_processモジュールを使ってみます。

今回は単純化のために、リクエストを受け付けたらforkするようにしました。

src/fork-http-server.ts

import { fork } from 'node:child_process';
import http from 'node:http';
import { queryToMap } from './handler';
import { log } from './log';
import { registerShutdown } from './shutdown';

const httpServer = http.createServer((req: http.IncomingMessage, res: http.ServerResponse) => {
  const subProcess = fork(`${__dirname}/fork-handler`);

  subProcess.on('spawn', () => {
    log(`sub-process start, pid: ${subProcess.pid}`);
  });

  const query = req.url?.split('?')[1];
  const params = queryToMap(query);

  subProcess.send(JSON.stringify(Object.fromEntries(params)));

  subProcess.on('message', (result) => {
    res.writeHead(200, { 'Content-Type': 'application/json' });
    res.end(
      JSON.stringify({
        result,
        url: req.url,
        pid: subProcess.pid,
      })
    );

    res.on('close', () => {
      subProcess.kill();
    });
  });

  subProcess.on('exit', (code, signal) => {
    log(`sub-process exit, pid: ${subProcess.pid}, code: ${code}, signal: ${signal}`);
  });
});

httpServer.listen({
  host: '0.0.0.0',
  port: 8000,
});

log(`start fork http-server, pid: ${process.pid}`);

registerShutdown(httpServer);

リクエストを受け付けたら、まずはforkします。

const httpServer = http.createServer((req: http.IncomingMessage, res: http.ServerResponse) => {
  const subProcess = fork(`${__dirname}/fork-handler`);

子プロセスに、IPCで処理を行うためのメッセージを送信。

  subProcess.send(JSON.stringify(Object.fromEntries(params)));

子プロセスが計算した値を、レスポンスとして返します。

  subProcess.on('message', (result) => {
    res.writeHead(200, { 'Content-Type': 'application/json' });
    res.end(
      JSON.stringify({
        result,
        url: req.url,
        pid: subProcess.pid,
      })
    );

レスポンスの送信が完了したら、子プロセスを終了。

    res.on('close', () => {
      subProcess.kill();
    });
  });

こんな感じで、計算処理の部分は子プロセスで、リクエストやレスポンスの読み出しなどは親プロセスで行うようにしました。

また、forkで起動するモジュールでは親プロセスからメッセージを受け取り、メッセージを返す必要があるので以下のような
モジュールを作成しました。

src/fork-handler.ts

import { fibRequestHandler } from './handler';
import { log } from './log';

process.on('message', (paramsAsString: string) => {
  log(`sub-process accept request, pid = ${process.pid}`);

  const params = new Map<string, string>(Object.entries(JSON.parse(paramsAsString)));

  const result = fibRequestHandler(params);

  process.send!({ result });
});

プロセス間のメッセージの送信はsendで、

受信はon('message', handler)で行います。

わかりにくいですが、以下では親プロセスからメッセージを受け取り、処理が終わったら親プロセスにメッセージを送信しています。

process.on('message', (paramsAsString: string) => {
  log(`sub-process accept request, pid = ${process.pid}`);

  const params = new Map<string, string>(Object.entries(JSON.parse(paramsAsString)));

  const result = fibRequestHandler(params);

  process.send!({ result });
});

親プロセスから子プロセスに情報を渡し、子プロセスから結果を受け取っているのは以下の部分ですね。

  subProcess.send(JSON.stringify(Object.fromEntries(params)));

  subProcess.on('message', (result) => {
    res.writeHead(200, { 'Content-Type': 'application/json' });
    res.end(
      JSON.stringify({
        result,
        url: req.url,
        pid: subProcess.pid,
      })
    );

起動。

$ node dist/fork-http-server.js
[2023-03-21T14:39:00.351Z] start fork http-server, pid: 29124

リクエストを1つ送ってみます。

$ time curl localhost:8000?num=42

ログには、こんな感じでforkされて計算処理を行い、終わったらプロセスを終了させていることを確認できます。

[2023-03-21T14:39:37.740Z] sub-process start, pid: 29148
[2023-03-21T14:39:37.773Z] sub-process accept request, pid = 29148
[2023-03-21T14:39:41.418Z] sub-process exit, pid: 29148, code: null, signal: SIGTERM

では、リクエストを4つ送ってみます。

$ time curl localhost:8000?num=42

$ time curl localhost:8000?num=42

$ time curl localhost:8000?num=42

$ time curl localhost:8000?num=42

結果。

{"result":{"result":267914296},"url":"/?num=42","pid":29192}
real    0m4.192s
user    0m0.005s
sys     0m0.004s


{"result":{"result":267914296},"url":"/?num=42","pid":29201}
real    0m4.261s
user    0m0.004s
sys     0m0.002s


{"result":{"result":267914296},"url":"/?num=42","pid":29210}
real    0m4.251s
user    0m0.003s
sys     0m0.003s


{"result":{"result":267914296},"url":"/?num=42","pid":29219}
real    0m4.209s
user    0m0.003s
sys     0m0.004s

リクエストを受け付けてから、子プロセスが終了するまでのログ。

[2023-03-21T14:40:43.341Z] sub-process start, pid: 29192
[2023-03-21T14:40:43.377Z] sub-process accept request, pid = 29192
[2023-03-21T14:40:43.888Z] sub-process start, pid: 29201
[2023-03-21T14:40:43.923Z] sub-process accept request, pid = 29201
[2023-03-21T14:40:44.216Z] sub-process start, pid: 29210
[2023-03-21T14:40:44.251Z] sub-process accept request, pid = 29210
[2023-03-21T14:40:44.520Z] sub-process start, pid: 29219
[2023-03-21T14:40:44.559Z] sub-process accept request, pid = 29219
[2023-03-21T14:40:47.524Z] sub-process exit, pid: 29192, code: null, signal: SIGTERM
[2023-03-21T14:40:48.144Z] sub-process exit, pid: 29201, code: null, signal: SIGTERM
[2023-03-21T14:40:48.462Z] sub-process exit, pid: 29210, code: null, signal: SIGTERM
[2023-03-21T14:40:48.723Z] sub-process exit, pid: 29219, code: null, signal: SIGTERM

CPUが複数使われていることを確認。

23時40分46秒  CPU    %usr   %nice    %sys %iowait    %irq   %soft  %steal  %guest  %gnice   %idle
23時40分47秒  all   50.63    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00   49.37
23時40分47秒    0    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00  100.00
23時40分47秒    1  100.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00
23時40分47秒    2  100.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00
23時40分47秒    3    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00  100.00
23時40分47秒    4  100.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00
23時40分47秒    5    1.01    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00   98.99
23時40分47秒    6  100.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00
23時40分47秒    7    1.01    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00   98.99

なお、リクエストを処理している間のプロセスを見ると、以下のようになっています。

xxxxx   29124    9821  0 23:39 pts/4    00:00:00 node dist/fork-http-server.js
xxxxx   29301   29124 73 23:42 pts/4    00:00:02 /path/to/node /path/to/dist/fork-handler
xxxxx   29310   29124 97 23:42 pts/4    00:00:01 /path/to/node /path/to/dist/fork-handler
xxxxx   29319   29124 84 23:42 pts/4    00:00:01 /path/to/node /path/to/dist/fork-handler
xxxxx   29328   29124 73 23:42 pts/4    00:00:01 /path/to/node /path/to/dist/fork-handler

リクエストが終了次第、forkした子プロセスは終了していきます。

終了。

[2023-03-21T14:45:49.815Z] shutdown, signal: SIGINT

worker_threadsモジュールを使う

最後は、worker_threadsモジュールを使ってマルチスレッドで動作するようにしましょう。

こちらも、簡単のためにリクエストを受け付けたら新しくスレッドを起動するようにしています。

src/threaded-http-server.ts

import http from 'node:http';
import { Worker, isMainThread, threadId } from 'node:worker_threads';
import { queryToMap } from './handler';
import { log } from './log';
import { registerShutdown } from './shutdown';

log(`main-thread, pid = ${process.pid}, thread-id = ${threadId}, is-main-thread: ${isMainThread}`);

const httpServer = http.createServer((req: http.IncomingMessage, res: http.ServerResponse) => {
  const query = req.url?.split('?')[1];
  const params = queryToMap(query);

  const worker = new Worker(`${__dirname}/worker-handler`, { workerData: params });

  worker.on('online', () => {
    log(`worker-thread start, pid: ${process.pid}, thread-id: ${worker.threadId}, is-main-thread: ${isMainThread}`);
  });

  worker.on('message', (result) => {
    res.writeHead(200, { 'Content-Type': 'application/json' });
    res.end(
      JSON.stringify({
        result,
        url: req.url,
        pid: process.pid,
        threadId: worker.threadId,
      })
    );
  });

  worker.on('exit', () => {
    log(`worker-thread exit, pid: ${process.pid}, thread-id: ${worker.threadId}, is-main-thread: ${isMainThread}`);
  });
});

httpServer.listen(
  {
    host: '0.0.0.0',
    port: 8000,
  },
  () => registerShutdown(httpServer)
);

log(`start threaded http-server, pid: ${process.pid}, thread-id: ${threadId}, is-main-thread: ${isMainThread}`);

スレッドは、Workerに対象のモジュールを指定することで起動します。

  const worker = new Worker(`${__dirname}/worker-handler`, { workerData: params });

この時に、スレッドに渡すデータも指定できます。

コンストラクタ以外でも、postMessageでも渡せそうですね。

Worker threads / new Worker(filename[, options]) / worker.postMessage(value[, transferList])

計算処理は起動したスレッドで行い、メッセージとして受け取ります。

  worker.on('message', (result) => {
    res.writeHead(200, { 'Content-Type': 'application/json' });
    res.end(
      JSON.stringify({
        result,
        url: req.url,
        pid: process.pid,
        threadId: worker.threadId,
      })
    );
  });

この時にスレッドのidも返すようにしました。

この値は、プラットフォームのスレッドのidとは関係がなさそうです。

Worker threads / worker.threadId

また、現在のスレッドがメインスレッドかどうかはisMainThreadで判定することができます。今回はログ出力に使っています。

Worker threads / worker.isMainThread

isMainThreadtrueを返す場合は、Worker内で動作していないことを示します。

起動したスレッドとメッセージの送受信をする必要があるので、child_processモジュールを使った時と同じようにモジュールを用意しました。

src/worker-handler.ts

import { isMainThread, parentPort, threadId, workerData } from 'node:worker_threads';
import { fibRequestHandler } from './handler';
import { log } from './log';

log(`worker-thread accept request, pid = ${process.pid}, thread-id = ${threadId}, is-main-thread: ${isMainThread}`);

const params: Map<string, string> = workerData;

const result = fibRequestHandler(workerData);

parentPort?.postMessage(result);

Worker作成時に渡したデータは、workerDataで参照できます。

Worker threads / worker.workerData

そして、親のスレッドへデータを渡すには、parentPort経由でpostMessageを使うことで行えます。

こちらを使うことで、起動元・先のスレッド間でデータのやり取りを行うことができます。

起動。

$ node dist/threaded-http-server.js
[2023-03-21T15:13:59.056Z] main-thread, pid = 30736, thread-id = 0, is-main-thread: true
[2023-03-21T15:13:59.061Z] start threaded http-server, pid: 30736, thread-id: 0, is-main-thread: true

リクエストを1つ送ってみます。

$ time curl localhost:8000?num=42
{"result":267914296,"url":"/?num=42","pid":30736,"threadId":1}
real    0m3.793s
user    0m0.003s
sys     0m0.008s

ログには、こんな感じで別スレッドで計算処理を行い、終わったらスレッドを終了させていることを確認できます。

[2023-03-21T15:14:38.375Z] worker-thread start, pid: 30736, thread-id: 1, is-main-thread: true
[2023-03-21T15:14:38.380Z] worker-thread accept request, pid = 30736, thread-id = 1, is-main-thread: false
[2023-03-21T15:14:42.099Z] worker-thread exit, pid: 30736, thread-id: -1, is-main-thread: true

スレッド終了のイベントハンドリングの時には、スレッドのidがわからなくなるみたいですね。

では、リクエストを4つ送ってみます。

$ time curl localhost:8000?num=42

$ time curl localhost:8000?num=42

$ time curl localhost:8000?num=42

$ time curl localhost:8000?num=42

結果。

{"result":267914296,"url":"/?num=42","pid":30736,"threadId":2}
real    0m4.369s
user    0m0.003s
sys     0m0.007s


{"result":267914296,"url":"/?num=42","pid":30736,"threadId":3}
real    0m4.356s
user    0m0.000s
sys     0m0.006s


{"result":267914296,"url":"/?num=42","pid":30736,"threadId":4}
real    0m4.365s
user    0m0.007s
sys     0m0.000s


{"result":267914296,"url":"/?num=42","pid":30736,"threadId":5}
real    0m4.369s
user    0m0.007s
sys     0m0.000s

複数のリクエストを送信しても、それほど劣化せずそれぞれのリクエストを処理できています。

リクエストを受け付けてから、子プロセスが終了するまでのログ。

[2023-03-21T15:15:30.236Z] worker-thread start, pid: 30736, thread-id: 2, is-main-thread: true
[2023-03-21T15:15:30.241Z] worker-thread accept request, pid = 30736, thread-id = 2, is-main-thread: false
[2023-03-21T15:15:30.492Z] worker-thread start, pid: 30736, thread-id: 3, is-main-thread: true
[2023-03-21T15:15:30.497Z] worker-thread accept request, pid = 30736, thread-id = 3, is-main-thread: false
[2023-03-21T15:15:30.711Z] worker-thread start, pid: 30736, thread-id: 4, is-main-thread: true
[2023-03-21T15:15:30.718Z] worker-thread accept request, pid = 30736, thread-id = 4, is-main-thread: false
[2023-03-21T15:15:30.936Z] worker-thread start, pid: 30736, thread-id: 5, is-main-thread: true
[2023-03-21T15:15:30.941Z] worker-thread accept request, pid = 30736, thread-id = 5, is-main-thread: false
[2023-03-21T15:15:34.540Z] worker-thread exit, pid: 30736, thread-id: -1, is-main-thread: true
[2023-03-21T15:15:34.795Z] worker-thread exit, pid: 30736, thread-id: -1, is-main-thread: true
[2023-03-21T15:15:35.021Z] worker-thread exit, pid: 30736, thread-id: -1, is-main-thread: true
[2023-03-21T15:15:35.240Z] worker-thread exit, pid: 30736, thread-id: -1, is-main-thread: true

CPUが複数使われていることを確認。

00時15分30秒  CPU    %usr   %nice    %sys %iowait    %irq   %soft  %steal  %guest  %gnice   %idle
00時15分31秒  all   53.63    0.00    1.00    0.00    0.00    0.00    0.00    0.00    0.00   45.36
00時15分31秒    0  100.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00
00時15分31秒    1    5.00    0.00    2.00    0.00    0.00    0.00    0.00    0.00    0.00   93.00
00時15分31秒    2   22.00    0.00    1.00    0.00    0.00    0.00    0.00    0.00    0.00   77.00
00時15分31秒    3   80.20    0.00    1.98    0.00    0.00    0.00    0.00    0.00    0.00   17.82
00時15分31秒    4  100.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00
00時15分31秒    5   13.13    0.00    3.03    0.00    0.00    0.00    0.00    0.00    0.00   83.84
00時15分31秒    6    7.14    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00   92.86
00時15分31秒    7  100.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00    0.00

なお、リクエストを処理している間のスレッドの様子はこんな感じでちょっと多めですが、処理が終了するとスレッドはちゃんと
減ります。Node.jsがいくつかスレッドを管理していそうな感じですね。

$ ps aux -L | grep node | grep dist
xxxxx   31040   31040  0.2   11  0.5 1430956 93000 pts/4   Sl+  00:19   0:00 node dist/threaded-http-server.js
xxxxx   31040   31041  0.0   11  0.5 1430956 93000 pts/4   Sl+  00:19   0:00 node dist/threaded-http-server.js
xxxxx   31040   31042  0.0   11  0.5 1430956 93000 pts/4   Sl+  00:19   0:00 node dist/threaded-http-server.js
xxxxx   31040   31043  0.0   11  0.5 1430956 93000 pts/4   Sl+  00:19   0:00 node dist/threaded-http-server.js
xxxxx   31040   31044  0.0   11  0.5 1430956 93000 pts/4   Sl+  00:19   0:00 node dist/threaded-http-server.js
xxxxx   31040   31045  0.0   11  0.5 1430956 93000 pts/4   Sl+  00:19   0:00 node dist/threaded-http-server.js
xxxxx   31040   31046  0.0   11  0.5 1430956 93000 pts/4   Sl+  00:19   0:00 node dist/threaded-http-server.js
xxxxx   31040   31117  112   11  0.5 1430956 93000 pts/4   Rl+  00:19   0:02 node dist/threaded-http-server.js
xxxxx   31040   31120  184   11  0.5 1430956 93000 pts/4   Rl+  00:19   0:01 node dist/threaded-http-server.js
xxxxx   31040   31123  143   11  0.5 1430956 93000 pts/4   Rl+  00:19   0:01 node dist/threaded-http-server.js
xxxxx   31040   31126  101   11  0.5 1430956 93000 pts/4   Rl+  00:19   0:01 node dist/threaded-http-server.js

ざっくり、こんなところでしょうか。

オマケ

ちょっとしたオマケとして、clusterモジュールを使って親プロセスでHTTPサーバーを起動した場合はどうしたらいいのかな?というのを
試行してみました。

src/prefork-http-server2.ts

import cluster, { Worker } from 'node:cluster';
import http from 'node:http';
import { queryToMap } from './handler';
import { log } from './log';
import { registerShutdown } from './shutdown';

const processes = process.argv[2] !== undefined ? parseInt(process.argv[2], 10) : 4;

cluster.setupPrimary({ exec: `${__dirname}/fork-handler` });

const subProcesses: Worker[] = [];

for (let i = 0; i < processes; i++) {
  const subProcess = cluster.fork();
  log(`fork process, pid: ${subProcess.process.pid}`);

  subProcesses.push(subProcess);
}

log(`prefork, ${processes} processes`);

let index = 0;

const httpServer = http.createServer((req: http.IncomingMessage, res: http.ServerResponse) => {
  const query = req.url?.split('?')[1];
  const params = queryToMap(query);

  if (index >= subProcesses.length) {
    index = 0;
  }

  const subProcess = subProcesses[index++];

  subProcess.send(JSON.stringify(Object.fromEntries(params)));

  subProcess.on('message', (result) => {
    res.writeHead(200, { 'Content-Type': 'application/json' });
    res.end(
      JSON.stringify({
        result,
        url: req.url,
        pid: subProcess.process.pid,
      })
    );
  });
});

httpServer.listen(
  {
    host: '0.0.0.0',
    port: 8000,
  },
  () => registerShutdown(httpServer)
);

log(`start prefork http-server, pid: ${process.pid}`);

最初に子プロセスのプールを作って、リクエストが来ると子プロセスにメッセージ送受信する感じになりました。

こちらも簡易的なものですが。

まとめ

Node.jsでマルチプロセス、マルチスレッドを扱う方法をいくつか調べてみました。

これまでNode.jsではこのあたりの話題に触れてこなかったので、いろいろ大変でしたがいい勉強になりました。

アプリケーション内でのプロセス起動、スレッド起動、どちらも使えるんですね。