技術探し

JavaScriptを中心に記事を書いていきます :;(∩´﹏`∩);:

Node.jsにworkerが入った

Node@10.5.0で入ったworkerの話です。

この記事は、Roppongi.js #4の登壇資料です。
5minで話しきれないので記事にまとめました。

slides.hiroppy.me

実は、自分がNode.jsに関わって、最初から最後(今現在)までずっと追っている珍しいモジュールです。

worker_threads とは?

github.com

実装著者はAnna (このPRはio.js時代にpetkaantonovが実装したのをベースに現環境へ移した)

worker_threadsは独立したスレッドで動作する環境を構築し、それらの間にメッセージチャンネルを構築をする手段を提供します。
注意点として、Node.jsの非同期はworkerよりも効率的なため、I/Oには使用しないほうが良いです。

目的

Node.jsにおいて、大量に負荷の高い処理することは苦手です。
なので、CPU負荷の高い作業を別のスレッドに委ねて、負荷を分散させることが目的です。

child_process や cluster と違う部分

worker_threadsの場合、ArrayBufferのインスタンス間の転送をしたり、 それらの間でSharedArrayBufferのインスタンスを共有をすることによりメモリを効率的に共有することが可能です。
child_process.fork()cluster.fork() と比べるとマルチスレッドに特化したモジュールということです。

会話する方法

child_processのIPCと異なります。
ブラウザ同様に、postMessage を使用し、会話します。
また、シリアライズされたデータはプロセスを離れる必要がないため全体的に通信に伴うオーバーヘッドが少なくなります。

// child_process

// parent
const { fork } = require('child_process');

const child = fork('child.js');

child.on('message', message => {
  console.log('message', message);
  child.send('from parent');
});

// child
process.on('message', (message) => {
  console.log('message', message);
});

if (process.send) process.send('from child');
// worker_threads
const { MessageChannel } = require('worker_threads');

const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => console.log(message));
port2.postMessage('hi');

port2.on('message', (message) => console.log(message));
port1.postMessage('bye');

共有範囲

各々のwokerは自身のイベントループを持ちますが、いくつかのリソースに関してはworker間で共有されます。
(e.g. libuvのスレッドプール、V8 Isolate、V8 Environment)

これは現時点では実装されてないだけで、今後変わる可能性はあります。

またグローバルはスレッド間で共有されません。

API制限

禁止
  • process.chdir() 及びグループまたはユーザーIDを設定するプロセスメソッド
  • process.abort()
  • domain
  • 親プロセスからのIPCチャンネルのアクセス
変更
  • process.env は読み取り専用
  • process.titleは変更不可
  • process.exit()は単一スレッドのみが処理対象
  • process.stdin, process.stdout, process.stderr はnull
  • シグナルは行われない(process.on)

歴史

実は、workerとはAyo.jsから来ており、 2017/10/23の時点で初期の実装は完了していました。

blog.hiroppy.me

上記の記事と大幅な変化は無いため、ある程度説明は省きます。

また、当時はworker という名前で決定するつもりでしたが、ユーザーランドのライブラリ名と衝突してしまい、最終的(?)にworker_threadsという名前になりました。

github.com

blog.hiroppy.me

このように、長い期間や様々な問題がありながらもようやく入った珍しいモジュールです。

使い方

現時点では実験段階(stability:1 )なので、起動時に --experimental-worker が必要です。

const {
  threadId, isMainThread,
  Worker, workerData, parentPort
  MessageChannel, MessagePort
} = require('worker_threads');

長くなるので、変数、メソッド、クラス等の説明は以下を参照してください。

AyoでWorkerの実装が進んでいる - 技術探し

複数のworkerを動かす

const { Worker, isMainThread, workerData } = require('worker_threads');

let current = 0;

function counter(title, cnt) {
  console.log(`| ${title} |: ${cnt}`);
}

if (isMainThread) {
  console.log('Main Thread');

  // 4つ作成する
  for (let i = 0; i < 4; i++) {
    // 相対パスはダメ
    // この場合(__filename)はworkerも同じこのファイルを参照する
    // 第二引数はグローバルパラメーター
    new Worker(__filename, { workerData: i });
  }

  setInterval((title) => {
    counter(title, ++current);
  }, 1000, 'MainThread');
} else {
  console.log(`worker: ${workerData}`);

  setInterval((title) => {
    counter(title, ++current);
  }, 1000, `worker: ${workerData}`);
}
# output
$ node --experimental-worker index.js
Main Thread
worker: 0
worker: 1
worker: 2
worker: 3
| MainThread |: 1
| worker: 0 |: 1
| worker: 1 |: 1
| worker: 2 |: 1
| worker: 3 |: 1
| MainThread |: 2
| worker: 0 |: 2
| worker: 1 |: 2
| worker: 2 |: 2
| worker: 3 |: 2
| MainThread |: 3
| worker: 0 |: 3
| worker: 1 |: 3
| worker: 2 |: 3
| worker: 3 |: 3

スレッド間でメッセージングを行う

// parent.js
// main thread

const { resolve } = require('path');
const { Worker, workerData } = require('worker_threads');

console.log('| Main Thread |');

function createWorker(path, cb) {
  const worker = new Worker(path, { workerData: null });

  worker.on('message', (msg) => {
    cb(null, msg);
  });
  worker.on('error', cb);
  worker.on('exit', (code) => {
    if (code !== 0) throw new Error('worker stopped');
    console.log('| Main Thread | worker stopped');
  });

  return worker;
}

const w = createWorker(resolve('child.js'), (err, res) => {
  // workerから結果を受け取る
  if (err) {
    console.error(err);
    process.exit(1);
  }

  console.log(`| Main Thread | execution time: ${res}ms from Worker Thread`);

  // workerを停止させるためにメッセージを送る
  w.postMessage('thx;)');
});
// child.js
// worker thread

const { PerformanceObserver, performance } = require('perf_hooks');
const { parentPort } = require('worker_threads');

console.log('| Worker |');

const obs = new PerformanceObserver((items) => {
  const time = items.getEntries()[0].duration;

  // main threadへ処理時間の結果を返す
  parentPort.postMessage(time);

  performance.clearMarks();
});
obs.observe({ entryTypes: ['measure'] });

const len = 64 * 1024 * 1024;
const b = Buffer.allocUnsafe(len);
let s = '';

// 重い処理なためこの箇所の実行時間を計測する
performance.mark('A');
for (let i = 0; i < 256; ++i) s += String.fromCharCode(i);
for (let i = 0; i < 64 * 1024 * 1024; i += 256) b.write(s, i, 256, 'ascii');
for (let i = 0; i < 32; ++i) b.toString('base64');
performance.mark('B');
performance.measure('A to B', 'A', 'B');

// main threadから返答が来たらこのworkerを終了する
parentPort.on('message', (msg) => {
  console.log(`| Worker | ${msg} from Main Thread`);
  process.exit();
});
# output
$ node --experimental-worker parent.js
| Main Thread |
| Worker |
| Main Thread | execution time: 3128.782205ms from Worker Thread
| Worker | thx;) from Main Thread
| Main Thread | worker stopped

パフォーマンス

    $ ./node benchmark/cluster/echo.js
    cluster/echo.js n=100000 sendsPerBroadcast=1 payload="string" workers=1: 33,647.30473442063
    cluster/echo.js n=100000 sendsPerBroadcast=10 payload="string" workers=1: 12,927.907405288383
    cluster/echo.js n=100000 sendsPerBroadcast=1 payload="object" workers=1: 28,496.37373941151
    cluster/echo.js n=100000 sendsPerBroadcast=10 payload="object" workers=1: 8,975.53747186485
    $ ./node --experimental-worker benchmark/worker/echo.js
    worker/echo.js n=100000 sendsPerBroadcast=1 payload="string" workers=1: 88,044.32902365089
    worker/echo.js n=100000 sendsPerBroadcast=10 payload="string" workers=1: 39,873.33697018837
    worker/echo.js n=100000 sendsPerBroadcast=1 payload="object" workers=1: 64,451.29132425621
    worker/echo.js n=100000 sendsPerBroadcast=10 payload="object" workers=1: 22,325.635443739284

まだオーバーヘッドが大きく、パフォーマンスには改善の余地があります。

まだできないこと

  • ネイティブアドオンをworkerからロードする
  • inspectorのサポート
  • handle(ポインタを示す)はネットワークソケット同様に転送ができない
  • FileクラスやBlobクラスやFile APIなどのサポート(対応するかは未定)
  • ...等々

また具体的な計画が出てないため、今後に期待。

さいごに

まだ入ったばかりでこれからどんどんおもしろくなるモジュールです!
パフォーマンスのチューニングには欠かせないモジュールなので、stability2に入ると今後使う機会が増えると思います。

なにか不明なことや近況が知りたい場合は、Twitterまでどうぞ。