[toc]
nodejs 中应用 worker_threads 来创立新的线程
简介
之前的文章中提到了,nodejs 中有两种线程,一种是 event loop 用来相应用户的申请和解决各种 callback。另一种就是 worker pool 用来解决各种耗时操作。
nodejs 的官网提到了一个可能应用 nodejs 本地 woker pool 的 lib 叫做 webworker-threads。
惋惜的是 webworker-threads 的最初一次更新还是在 2 年前,而在最新的 nodejs 12 中,根本无法应用。
而 webworker-threads 的作者则举荐了一个新的 lib 叫做 web-worker。
web-worker 是构建于 nodejs 的 worker_threads 之上的,本文将会具体解说 worker_threads 和 web-worker 的应用。
worker_threads
worker_threads 模块的源代码源自 lib/worker_threads.js,它指的是工作线程,能够开启一个新的线程来并行执行 javascript 程序。
worker_threads 次要用来解决 CPU 密集型操作, 而不是 IO 操作,因为 nodejs 自身的异步 IO 曾经十分弱小了。
worker_threads 中次要有 5 个属性,3 个 class 和 3 个次要的办法。接下来咱们将会一一解说。
isMainThread
isMainThread 用来判断代码是否在主线程中运行,咱们看一个应用的例子:
const {Worker, isMainThread} = require('worker_threads');
if (isMainThread) {console.log('在主线程中');
new Worker(__filename);
} else {console.log('在工作线程中');
console.log(isMainThread); // 打印 'false'。}
下面的例子中,咱们从 worker_threads 模块中引入了 Worker 和 isMainThread,Worker 就是工作线程的主类,咱们将会在前面具体解说,这里咱们应用 Worker 创立了一个工作线程。
MessageChannel
MessageChannel 代表的是一个异步双向通信 channel。MessageChannel 中没有办法,次要通过 MessageChannel 来连贯两端的 MessagePort。
class MessageChannel {
readonly port1: MessagePort;
readonly port2: MessagePort;
}
当咱们应用 new MessageChannel() 的时候,会主动创立两个 MessagePort。
const {MessageChannel} = require('worker_threads');
const {port1, port2} = new MessageChannel();
port1.on('message', (message) => console.log('received', message));
port2.postMessage({foo: 'bar'});
// Prints: received {foo: 'bar'} from the `port1.on('message')` listener
通过 MessageChannel,咱们能够进行 MessagePort 间的通信。
parentPort 和 MessagePort
parentPort 是一个 MessagePort 类型,parentPort 次要用于 worker 线程和主线程进行音讯交互。
通过 parentPort.postMessage() 发送的音讯在主线程中将能够通过 worker.on(‘message’) 接管。
主线程中通过 worker.postMessage() 发送的音讯将能够在工作线程中通过 parentPort.on(‘message’) 接管。
咱们看一下 MessagePort 的定义:
class MessagePort extends EventEmitter {close(): void;
postMessage(value: any, transferList?: Array<ArrayBuffer | MessagePort>): void;
ref(): void;
unref(): void;
start(): void;
addListener(event: "close", listener: () => void): this;
addListener(event: "message", listener: (value: any) => void): this;
addListener(event: string | symbol, listener: (...args: any[]) => void): this;
emit(event: "close"): boolean;
emit(event: "message", value: any): boolean;
emit(event: string | symbol, ...args: any[]): boolean;
on(event: "close", listener: () => void): this;
on(event: "message", listener: (value: any) => void): this;
on(event: string | symbol, listener: (...args: any[]) => void): this;
once(event: "close", listener: () => void): this;
once(event: "message", listener: (value: any) => void): this;
once(event: string | symbol, listener: (...args: any[]) => void): this;
prependListener(event: "close", listener: () => void): this;
prependListener(event: "message", listener: (value: any) => void): this;
prependListener(event: string | symbol, listener: (...args: any[]) => void): this;
prependOnceListener(event: "close", listener: () => void): this;
prependOnceListener(event: "message", listener: (value: any) => void): this;
prependOnceListener(event: string | symbol, listener: (...args: any[]) => void): this;
removeListener(event: "close", listener: () => void): this;
removeListener(event: "message", listener: (value: any) => void): this;
removeListener(event: string | symbol, listener: (...args: any[]) => void): this;
off(event: "close", listener: () => void): this;
off(event: "message", listener: (value: any) => void): this;
off(event: string | symbol, listener: (...args: any[]) => void): this;
}
MessagePort 继承自 EventEmitter,它示意的是异步双向通信 channel 的一端。这个 channel 就叫做 MessageChannel,MessagePort 通过 MessageChannel 来进行通信。
咱们能够通过 MessagePort 来传输构造体数据,内存区域或者其余的 MessagePorts。
从源代码中,咱们能够看到 MessagePort 中有两个事件,close 和 message。
close 事件将会在 channel 的中任何一端断开连接的时候触发,而 message 事件将会在 port.postMessage 时候触发,上面咱们看一个例子:
const {MessageChannel} = require('worker_threads');
const {port1, port2} = new MessageChannel();
// Prints:
// foobar
// closed!
port2.on('message', (message) => console.log(message));
port2.on('close', () => console.log('closed!'));
port1.postMessage('foobar');
port1.close();
port.on(‘message’) 实际上为 message 事件增加了一个 listener,port 还提供了 addListener 办法来手动增加 listener。
port.on(‘message’) 会主动触发 port.start()办法,示意启动一个 port。
当 port 有 listener 存在的时候,这示意 port 存在一个 ref,当存在 ref 的时候,程序是不会完结的。咱们能够通过调用 port.unref 办法来勾销这个 ref。
接下来咱们看一下怎么通过 port 来传输音讯:
port.postMessage(value[, transferList])
postMessage 能够承受两个参数,第一个参数是 value,这是一个 JavaScript 对象。第二个参数是 transferList。
先看一个传递一个参数的状况:
const {MessageChannel} = require('worker_threads');
const {port1, port2} = new MessageChannel();
port1.on('message', (message) => console.log(message));
const circularData = {};
circularData.foo = circularData;
// Prints: {foo: [Circular] }
port2.postMessage(circularData);
通常来说 postMessage 发送的对象都是 value 的拷贝,然而如果你指定了 transferList,那么在 transferList 中的对象将会被 transfer 到 channel 的承受端,并且不再存在于发送端,就如同把对象传送进来一样。
transferList 是一个 list,list 中的对象能够是 ArrayBuffer, MessagePort 和 FileHandle。
如果 value 中蕴含 SharedArrayBuffer 对象,那么该对象不能被蕴含在 transferList 中。
看一个蕴含两个参数的例子:
const {MessageChannel} = require('worker_threads');
const {port1, port2} = new MessageChannel();
port1.on('message', (message) => console.log(message));
const uint8Array = new Uint8Array([1, 2, 3, 4]);
// post uint8Array 的拷贝:
port2.postMessage(uint8Array);
port2.postMessage(uint8Array, [ uint8Array.buffer]);
//port2.postMessage(uint8Array);
下面的例子将输入:
Uint8Array(4) [1, 2, 3, 4]
Uint8Array(4) [1, 2, 3, 4]
第一个 postMessage 是拷贝,第二个 postMessage 是 transfer Uint8Array 底层的 buffer。
如果咱们再次调用 port2.postMessage(uint8Array),咱们会失去上面的谬误:
DOMException [DataCloneError]: An ArrayBuffer is detached and could not be cloned.
buffer 是 TypedArray 的底层存储构造,如果 buffer 被 transfer,那么之前的 TypedArray 将会变得不可用。
markAsUntransferable
要想防止这个问题,咱们能够调用 markAsUntransferable 将 buffer 标记为不可 transferable. 咱们看一个 markAsUntransferable 的例子:
const {MessageChannel, markAsUntransferable} = require('worker_threads');
const pooledBuffer = new ArrayBuffer(8);
const typedArray1 = new Uint8Array(pooledBuffer);
const typedArray2 = new Float64Array(pooledBuffer);
markAsUntransferable(pooledBuffer);
const {port1} = new MessageChannel();
port1.postMessage(typedArray1, [ typedArray1.buffer]);
console.log(typedArray1);
console.log(typedArray2);
SHARE_ENV
SHARE_ENV 是传递给 worker 构造函数的一个 env 变量,通过设置这个变量,咱们能够在主线程与工作线程进行共享环境变量的读写。
const {Worker, SHARE_ENV} = require('worker_threads');
new Worker('process.env.SET_IN_WORKER ="foo"', { eval: true, env: SHARE_ENV})
.on('exit', () => {console.log(process.env.SET_IN_WORKER); // Prints 'foo'.
});
workerData
除了 postMessage(),还能够通过在主线程中传递 workerData 给 worker 的构造函数,从而将主线程中的数据传递给 worker:
const {Worker, isMainThread, workerData} = require('worker_threads');
if (isMainThread) {const worker = new Worker(__filename, { workerData: 'Hello, world!'});
} else {console.log(workerData); // Prints 'Hello, world!'.
}
worker 类
先看一下 worker 的定义:
class Worker extends EventEmitter {
readonly stdin: Writable | null;
readonly stdout: Readable;
readonly stderr: Readable;
readonly threadId: number;
readonly resourceLimits?: ResourceLimits;
constructor(filename: string | URL, options?: WorkerOptions);
postMessage(value: any, transferList?: Array<ArrayBuffer | MessagePort>): void;
ref(): void;
unref(): void;
terminate(): Promise<number>;
getHeapSnapshot(): Promise<Readable>;
addListener(event: "error", listener: (err: Error) => void): this;
addListener(event: "exit", listener: (exitCode: number) => void): this;
addListener(event: "message", listener: (value: any) => void): this;
addListener(event: "online", listener: () => void): this;
addListener(event: string | symbol, listener: (...args: any[]) => void): this;
...
}
worker 继承自 EventEmitter,并且蕴含了 4 个重要的事件:error,exit,message 和 online。
worker 示意的是一个独立的 JavaScript 执行线程,咱们能够通过传递 filename 或者 URL 来结构 worker。
每一个 worker 都有一对内置的 MessagePort,在 worker 创立的时候就会互相关联。worker 应用这对内置的 MessagePort 来和父线程进行通信。
通过 parentPort.postMessage() 发送的音讯在主线程中将能够通过 worker.on(‘message’) 接管。
主线程中通过 worker.postMessage() 发送的音讯将能够在工作线程中通过 parentPort.on(‘message’) 接管。
当然,你也能够显式的创立 MessageChannel 对象,而后将 MessagePort 作为消息传递给其余线程,咱们看一个例子:
const assert = require('assert');
const {Worker, MessageChannel, MessagePort, isMainThread, parentPort} = require('worker_threads');
if (isMainThread) {const worker = new Worker(__filename);
const subChannel = new MessageChannel();
worker.postMessage({hereIsYourPort: subChannel.port1}, [subChannel.port1]);
subChannel.port2.on('message', (value) => {console.log('接管到:', value);
});
} else {parentPort.once('message', (value) => {assert(value.hereIsYourPort instanceof MessagePort);
value.hereIsYourPort.postMessage('工作线程正在发送此音讯');
value.hereIsYourPort.close();});
}
下面的例子中,咱们借助了 worker 和 parentPort 自身的消息传递性能,传递了一个显式的 MessageChannel 中的 MessagePort。
而后又通过该 MessagePort 来进行音讯的散发。
receiveMessageOnPort
除了 port 的 on(‘message’) 办法之外,咱们还能够应用 receiveMessageOnPort 来手动接管音讯:
const {MessageChannel, receiveMessageOnPort} = require('worker_threads');
const {port1, port2} = new MessageChannel();
port1.postMessage({hello: 'world'});
console.log(receiveMessageOnPort(port2));
// Prints: {message: { hello: 'world'} }
console.log(receiveMessageOnPort(port2));
// Prints: undefined
moveMessagePortToContext
先理解一下 nodejs 中的 Context 的概念,咱们能够从 vm 中创立 context,它是一个隔离的上下文环境,从而保障不同运行环境的安全性,咱们看一个 context 的例子:
const vm = require('vm');
const x = 1;
const context = {x: 2};
vm.createContext(context); // 上下文隔离化对象。const code = 'x += 40; var y = 17;';
// `x` and `y` 是上下文中的全局变量。// 最后,x 的值为 2,因为这是 context.x 的值。vm.runInContext(code, context);
console.log(context.x); // 42
console.log(context.y); // 17
console.log(x); // 1; y 没有定义。
在 worker 中,咱们能够将一个 MessagePort move 到其余的 context 中。
worker.moveMessagePortToContext(port, contextifiedSandbox)
这个办法接管两个参数,第一个参数就是要 move 的 MessagePort, 第二个参数就是 vm.createContext() 创立的 context 对象。
worker_threads 的线程池
下面咱们提到了应用单个的 worker thread,然而当初程序中一个线程往往是不够的,咱们须要创立一个线程池来保护 worker thread 对象。
nodejs 提供了 AsyncResource 类,来作为对异步资源的扩大。
AsyncResource 类是 async_hooks 模块中的。
上面咱们看下怎么应用 AsyncResource 类来创立 worker 的线程池。
假如咱们有一个 task,应用来执行两个数相加,脚本名字叫做 task_processor.js:
const {parentPort} = require('worker_threads');
parentPort.on('message', (task) => {parentPort.postMessage(task.a + task.b);
});
上面是 worker pool 的实现:
const {AsyncResource} = require('async_hooks');
const {EventEmitter} = require('events');
const path = require('path');
const {Worker} = require('worker_threads');
const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
class WorkerPoolTaskInfo extends AsyncResource {constructor(callback) {super('WorkerPoolTaskInfo');
this.callback = callback;
}
done(err, result) {this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy(); // `TaskInfo`s are used only once.}
}
class WorkerPool extends EventEmitter {constructor(numThreads) {super();
this.numThreads = numThreads;
this.workers = [];
this.freeWorkers = [];
for (let i = 0; i < numThreads; i++)
this.addNewWorker();}
addNewWorker() {const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));
worker.on('message', (result) => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
worker[kTaskInfo].done(null, result);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
worker.on('error', (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo])
worker[kTaskInfo].done(err, null);
else
this.emit('error', err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}
runTask(task, callback) {if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.once(kWorkerFreedEvent, () => this.runTask(task, callback));
return;
}
const worker = this.freeWorkers.pop();
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
worker.postMessage(task);
}
close() {for (const worker of this.workers) worker.terminate();}
}
module.exports = WorkerPool;
咱们给 worker 创立了一个新的 kTaskInfo 属性,并且将异步的 callback 封装到 WorkerPoolTaskInfo 中,赋值给 worker.kTaskInfo.
接下来咱们就能够应用 workerPool 了:
const WorkerPool = require('./worker_pool.js');
const os = require('os');
const pool = new WorkerPool(os.cpus().length);
let finished = 0;
for (let i = 0; i < 10; i++) {pool.runTask({ a: 42, b: 100}, (err, result) => {console.log(i, err, result);
if (++finished === 10)
pool.close();});
}
本文作者:flydean 程序那些事
本文链接:http://www.flydean.com/nodejs-worker-thread/
本文起源:flydean 的博客
欢送关注我的公众号:「程序那些事」最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!