Node.jsにworkerが入った
Node@10.5.0で入ったworkerの話です。
この記事は、Roppongi.js #4の登壇資料です。
5minで話しきれないので記事にまとめました。
実は、自分がNode.jsに関わって、最初から最後(今現在)までずっと追っている珍しいモジュールです。
worker_threads とは?
実装著者はAnna (このPRはio.js時代にpetkaantonovが実装したのをベースに現環境へ移した)
worker_threadsは独立したスレッドで動作する環境を構築し、それらの間にメッセージチャンネルを構築をする手段を提供します。
注意点として、Node.jsの非同期はworkerよりも効率的なため、I/Oには使用しないほうが良いです。
目的
Node.jsにおいて、大量に負荷の高い処理することは苦手です。
なので、CPU負荷の高い作業を別のスレッドに委ねて、負荷を分散させることが目的です。
child_process や cluster と違う部分
worker_threadsの場合、ArrayBufferのインスタンス間の転送をしたり、 それらの間でSharedArrayBufferのインスタンスを共有をすることによりメモリを効率的に共有することが可能です。
child_process.fork()
や cluster.fork()
と比べるとマルチスレッドに特化したモジュールということです。
会話する方法
child_processのIPCと異なります。
ブラウザ同様に、postMessage
を使用し、会話します。
また、シリアライズされたデータはプロセスを離れる必要がないため全体的に通信に伴うオーバーヘッドが少なくなります。
// child_process // parent const { fork } = require('child_process'); const child = fork('child.js'); child.on('message', message => { console.log('message', message); child.send('from parent'); }); // child process.on('message', (message) => { console.log('message', message); }); if (process.send) process.send('from child');
// worker_threads const { MessageChannel } = require('worker_threads'); const { port1, port2 } = new MessageChannel(); port1.on('message', (message) => console.log(message)); port2.postMessage('hi'); port2.on('message', (message) => console.log(message)); port1.postMessage('bye');
共有範囲
各々のwokerは自身のイベントループを持ちますが、いくつかのリソースに関してはworker間で共有されます。
(e.g. libuvのスレッドプール、V8 Isolate、V8 Environment)
これは現時点では実装されてないだけで、今後変わる可能性はあります。
またグローバルはスレッド間で共有されません。
API制限
禁止
process.chdir()
及びグループまたはユーザーIDを設定するプロセスメソッドprocess.abort()
- domain
- 親プロセスからのIPCチャンネルのアクセス
変更
process.env
は読み取り専用process.title
は変更不可process.exit()
は単一スレッドのみが処理対象process.stdin
,process.stdout
,process.stderr
はnull- シグナルは行われない(
process.on
)
等
歴史
実は、workerとはAyo.jsから来ており、 2017/10/23の時点で初期の実装は完了していました。
上記の記事と大幅な変化は無いため、ある程度説明は省きます。
また、当時はworker
という名前で決定するつもりでしたが、ユーザーランドのライブラリ名と衝突してしまい、最終的(?)にworker_threads
という名前になりました。
このように、長い期間や様々な問題がありながらもようやく入った珍しいモジュールです。
使い方
現時点では実験段階(stability:1 )なので、起動時に --experimental-worker
が必要です。
const { threadId, isMainThread, Worker, workerData, parentPort MessageChannel, MessagePort } = require('worker_threads');
長くなるので、変数、メソッド、クラス等の説明は以下を参照してください。
複数のworkerを動かす
const { Worker, isMainThread, workerData } = require('worker_threads'); let current = 0; function counter(title, cnt) { console.log(`| ${title} |: ${cnt}`); } if (isMainThread) { console.log('Main Thread'); // 4つ作成する for (let i = 0; i < 4; i++) { // 相対パスはダメ // この場合(__filename)はworkerも同じこのファイルを参照する // 第二引数はグローバルパラメーター new Worker(__filename, { workerData: i }); } setInterval((title) => { counter(title, ++current); }, 1000, 'MainThread'); } else { console.log(`worker: ${workerData}`); setInterval((title) => { counter(title, ++current); }, 1000, `worker: ${workerData}`); }
# output $ node --experimental-worker index.js Main Thread worker: 0 worker: 1 worker: 2 worker: 3 | MainThread |: 1 | worker: 0 |: 1 | worker: 1 |: 1 | worker: 2 |: 1 | worker: 3 |: 1 | MainThread |: 2 | worker: 0 |: 2 | worker: 1 |: 2 | worker: 2 |: 2 | worker: 3 |: 2 | MainThread |: 3 | worker: 0 |: 3 | worker: 1 |: 3 | worker: 2 |: 3 | worker: 3 |: 3
スレッド間でメッセージングを行う
// parent.js // main thread const { resolve } = require('path'); const { Worker, workerData } = require('worker_threads'); console.log('| Main Thread |'); function createWorker(path, cb) { const worker = new Worker(path, { workerData: null }); worker.on('message', (msg) => { cb(null, msg); }); worker.on('error', cb); worker.on('exit', (code) => { if (code !== 0) throw new Error('worker stopped'); console.log('| Main Thread | worker stopped'); }); return worker; } const w = createWorker(resolve('child.js'), (err, res) => { // workerから結果を受け取る if (err) { console.error(err); process.exit(1); } console.log(`| Main Thread | execution time: ${res}ms from Worker Thread`); // workerを停止させるためにメッセージを送る w.postMessage('thx;)'); });
// child.js // worker thread const { PerformanceObserver, performance } = require('perf_hooks'); const { parentPort } = require('worker_threads'); console.log('| Worker |'); const obs = new PerformanceObserver((items) => { const time = items.getEntries()[0].duration; // main threadへ処理時間の結果を返す parentPort.postMessage(time); performance.clearMarks(); }); obs.observe({ entryTypes: ['measure'] }); const len = 64 * 1024 * 1024; const b = Buffer.allocUnsafe(len); let s = ''; // 重い処理なためこの箇所の実行時間を計測する performance.mark('A'); for (let i = 0; i < 256; ++i) s += String.fromCharCode(i); for (let i = 0; i < 64 * 1024 * 1024; i += 256) b.write(s, i, 256, 'ascii'); for (let i = 0; i < 32; ++i) b.toString('base64'); performance.mark('B'); performance.measure('A to B', 'A', 'B'); // main threadから返答が来たらこのworkerを終了する parentPort.on('message', (msg) => { console.log(`| Worker | ${msg} from Main Thread`); process.exit(); });
# output $ node --experimental-worker parent.js | Main Thread | | Worker | | Main Thread | execution time: 3128.782205ms from Worker Thread | Worker | thx;) from Main Thread | Main Thread | worker stopped
パフォーマンス
$ ./node benchmark/cluster/echo.js cluster/echo.js n=100000 sendsPerBroadcast=1 payload="string" workers=1: 33,647.30473442063 cluster/echo.js n=100000 sendsPerBroadcast=10 payload="string" workers=1: 12,927.907405288383 cluster/echo.js n=100000 sendsPerBroadcast=1 payload="object" workers=1: 28,496.37373941151 cluster/echo.js n=100000 sendsPerBroadcast=10 payload="object" workers=1: 8,975.53747186485 $ ./node --experimental-worker benchmark/worker/echo.js worker/echo.js n=100000 sendsPerBroadcast=1 payload="string" workers=1: 88,044.32902365089 worker/echo.js n=100000 sendsPerBroadcast=10 payload="string" workers=1: 39,873.33697018837 worker/echo.js n=100000 sendsPerBroadcast=1 payload="object" workers=1: 64,451.29132425621 worker/echo.js n=100000 sendsPerBroadcast=10 payload="object" workers=1: 22,325.635443739284
まだオーバーヘッドが大きく、パフォーマンスには改善の余地があります。
まだできないこと
- ネイティブアドオンをworkerからロードする
- inspectorのサポート
- handle(ポインタを示す)はネットワークソケット同様に転送ができない
- FileクラスやBlobクラスやFile APIなどのサポート(対応するかは未定)
- ...等々
また具体的な計画が出てないため、今後に期待。
さいごに
まだ入ったばかりでこれからどんどんおもしろくなるモジュールです!
パフォーマンスのチューニングには欠かせないモジュールなので、stability2に入ると今後使う機会が増えると思います。
なにか不明なことや近況が知りたい場合は、Twitterまでどうぞ。