[toc]

nodejs中应用worker_threads来创立新的线程

简介

之前的文章中提到了,nodejs中有两种线程,一种是event loop用来相应用户的申请和解决各种callback。另一种就是worker pool用来解决各种耗时操作。

nodejs的官网提到了一个可能应用nodejs本地woker pool的lib叫做webworker-threads。

惋惜的是webworker-threads的最初一次更新还是在2年前,而在最新的nodejs 12中,根本无法应用。

而webworker-threads的作者则举荐了一个新的lib叫做web-worker。

web-worker是构建于nodejs的worker_threads之上的,本文将会具体解说worker_threads和web-worker的应用。

worker_threads

worker_threads模块的源代码源自lib/worker_threads.js,它指的是工作线程,能够开启一个新的线程来并行执行javascript程序。

worker_threads次要用来解决CPU密集型操作,而不是IO操作,因为nodejs自身的异步IO曾经十分弱小了。

worker_threads中次要有5个属性,3个class和3个次要的办法。接下来咱们将会一一解说。

isMainThread

isMainThread用来判断代码是否在主线程中运行,咱们看一个应用的例子:

const { Worker, isMainThread } = require('worker_threads');if (isMainThread) {    console.log('在主线程中');  new Worker(__filename);} else {  console.log('在工作线程中');  console.log(isMainThread);  // 打印 'false'。}

下面的例子中,咱们从worker_threads模块中引入了Worker和isMainThread,Worker就是工作线程的主类,咱们将会在前面具体解说,这里咱们应用Worker创立了一个工作线程。

MessageChannel

MessageChannel代表的是一个异步双向通信channel。MessageChannel中没有办法,次要通过MessageChannel来连贯两端的MessagePort。

    class MessageChannel {        readonly port1: MessagePort;        readonly port2: MessagePort;    }

当咱们应用new MessageChannel()的时候,会主动创立两个MessagePort。

const { MessageChannel } = require('worker_threads');const { port1, port2 } = new MessageChannel();port1.on('message', (message) => console.log('received', message));port2.postMessage({ foo: 'bar' });// Prints: received { foo: 'bar' } from the `port1.on('message')` listener

通过MessageChannel,咱们能够进行MessagePort间的通信。

parentPort和MessagePort

parentPort是一个MessagePort类型,parentPort次要用于worker线程和主线程进行音讯交互。

通过parentPort.postMessage()发送的音讯在主线程中将能够通过worker.on('message')接管。

主线程中通过worker.postMessage()发送的音讯将能够在工作线程中通过parentPort.on('message')接管。

咱们看一下MessagePort的定义:

class MessagePort extends EventEmitter {        close(): void;        postMessage(value: any, transferList?: Array<ArrayBuffer | MessagePort>): void;        ref(): void;        unref(): void;        start(): void;        addListener(event: "close", listener: () => void): this;        addListener(event: "message", listener: (value: any) => void): this;        addListener(event: string | symbol, listener: (...args: any[]) => void): this;        emit(event: "close"): boolean;        emit(event: "message", value: any): boolean;        emit(event: string | symbol, ...args: any[]): boolean;        on(event: "close", listener: () => void): this;        on(event: "message", listener: (value: any) => void): this;        on(event: string | symbol, listener: (...args: any[]) => void): this;        once(event: "close", listener: () => void): this;        once(event: "message", listener: (value: any) => void): this;        once(event: string | symbol, listener: (...args: any[]) => void): this;        prependListener(event: "close", listener: () => void): this;        prependListener(event: "message", listener: (value: any) => void): this;        prependListener(event: string | symbol, listener: (...args: any[]) => void): this;        prependOnceListener(event: "close", listener: () => void): this;        prependOnceListener(event: "message", listener: (value: any) => void): this;        prependOnceListener(event: string | symbol, listener: (...args: any[]) => void): this;        removeListener(event: "close", listener: () => void): this;        removeListener(event: "message", listener: (value: any) => void): this;        removeListener(event: string | symbol, listener: (...args: any[]) => void): this;        off(event: "close", listener: () => void): this;        off(event: "message", listener: (value: any) => void): this;        off(event: string | symbol, listener: (...args: any[]) => void): this;    }

MessagePort继承自EventEmitter,它示意的是异步双向通信channel的一端。这个channel就叫做MessageChannel,MessagePort通过MessageChannel来进行通信。

咱们能够通过MessagePort来传输构造体数据,内存区域或者其余的MessagePorts。

从源代码中,咱们能够看到MessagePort中有两个事件,close和message。

close事件将会在channel的中任何一端断开连接的时候触发,而message事件将会在port.postMessage时候触发,上面咱们看一个例子:

const { MessageChannel } = require('worker_threads');const { port1, port2 } = new MessageChannel();// Prints://   foobar//   closed!port2.on('message', (message) => console.log(message));port2.on('close', () => console.log('closed!'));port1.postMessage('foobar');port1.close();

port.on('message')实际上为message事件增加了一个listener,port还提供了addListener办法来手动增加listener。

port.on('message')会主动触发port.start()办法,示意启动一个port。

当port有listener存在的时候,这示意port存在一个ref,当存在ref的时候,程序是不会完结的。咱们能够通过调用port.unref办法来勾销这个ref。

接下来咱们看一下怎么通过port来传输音讯:

port.postMessage(value[, transferList])

postMessage能够承受两个参数,第一个参数是value,这是一个JavaScript对象。第二个参数是transferList。

先看一个传递一个参数的状况:

const { MessageChannel } = require('worker_threads');const { port1, port2 } = new MessageChannel();port1.on('message', (message) => console.log(message));const circularData = {};circularData.foo = circularData;// Prints: { foo: [Circular] }port2.postMessage(circularData);

通常来说postMessage发送的对象都是value的拷贝,然而如果你指定了transferList,那么在transferList中的对象将会被transfer到channel的承受端,并且不再存在于发送端,就如同把对象传送进来一样。

transferList是一个list,list中的对象能够是ArrayBuffer, MessagePort 和 FileHandle。

如果value中蕴含SharedArrayBuffer对象,那么该对象不能被蕴含在transferList中。

看一个蕴含两个参数的例子:

const { MessageChannel } = require('worker_threads');const { port1, port2 } = new MessageChannel();port1.on('message', (message) => console.log(message));const uint8Array = new Uint8Array([ 1, 2, 3, 4 ]);// post uint8Array的拷贝:port2.postMessage(uint8Array);port2.postMessage(uint8Array, [ uint8Array.buffer ]);//port2.postMessage(uint8Array);

下面的例子将输入:

Uint8Array(4) [ 1, 2, 3, 4 ]Uint8Array(4) [ 1, 2, 3, 4 ]

第一个postMessage是拷贝,第二个postMessage是transfer Uint8Array底层的buffer。

如果咱们再次调用port2.postMessage(uint8Array),咱们会失去上面的谬误:

DOMException [DataCloneError]: An ArrayBuffer is detached and could not be cloned.

buffer是TypedArray的底层存储构造,如果buffer被transfer,那么之前的TypedArray将会变得不可用。

markAsUntransferable

要想防止这个问题,咱们能够调用markAsUntransferable将buffer标记为不可transferable. 咱们看一个markAsUntransferable的例子:

const { MessageChannel, markAsUntransferable } = require('worker_threads');const pooledBuffer = new ArrayBuffer(8);const typedArray1 = new Uint8Array(pooledBuffer);const typedArray2 = new Float64Array(pooledBuffer);markAsUntransferable(pooledBuffer);const { port1 } = new MessageChannel();port1.postMessage(typedArray1, [ typedArray1.buffer ]);console.log(typedArray1);console.log(typedArray2);

SHARE_ENV

SHARE_ENV是传递给worker构造函数的一个env变量,通过设置这个变量,咱们能够在主线程与工作线程进行共享环境变量的读写。

const { Worker, SHARE_ENV } = require('worker_threads');new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV })  .on('exit', () => {    console.log(process.env.SET_IN_WORKER);  // Prints 'foo'.  });

workerData

除了postMessage(),还能够通过在主线程中传递workerData给worker的构造函数,从而将主线程中的数据传递给worker:

const { Worker, isMainThread, workerData } = require('worker_threads');if (isMainThread) {  const worker = new Worker(__filename, { workerData: 'Hello, world!' });} else {  console.log(workerData);  // Prints 'Hello, world!'.}

worker类

先看一下worker的定义:

    class Worker extends EventEmitter {        readonly stdin: Writable | null;        readonly stdout: Readable;        readonly stderr: Readable;        readonly threadId: number;        readonly resourceLimits?: ResourceLimits;        constructor(filename: string | URL, options?: WorkerOptions);        postMessage(value: any, transferList?: Array<ArrayBuffer | MessagePort>): void;        ref(): void;        unref(): void;        terminate(): Promise<number>;        getHeapSnapshot(): Promise<Readable>;        addListener(event: "error", listener: (err: Error) => void): this;        addListener(event: "exit", listener: (exitCode: number) => void): this;        addListener(event: "message", listener: (value: any) => void): this;        addListener(event: "online", listener: () => void): this;        addListener(event: string | symbol, listener: (...args: any[]) => void): this;       ...     }

worker继承自EventEmitter,并且蕴含了4个重要的事件:error,exit,message和online。

worker示意的是一个独立的 JavaScript 执行线程,咱们能够通过传递filename或者URL来结构worker。

每一个worker都有一对内置的MessagePort,在worker创立的时候就会互相关联。worker应用这对内置的MessagePort来和父线程进行通信。

通过parentPort.postMessage()发送的音讯在主线程中将能够通过worker.on('message')接管。

主线程中通过worker.postMessage()发送的音讯将能够在工作线程中通过parentPort.on('message')接管。

当然,你也能够显式的创立MessageChannel 对象,而后将MessagePort作为消息传递给其余线程,咱们看一个例子:

const assert = require('assert');const {  Worker, MessageChannel, MessagePort, isMainThread, parentPort} = require('worker_threads');if (isMainThread) {  const worker = new Worker(__filename);  const subChannel = new MessageChannel();  worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);  subChannel.port2.on('message', (value) => {    console.log('接管到:', value);  });} else {  parentPort.once('message', (value) => {    assert(value.hereIsYourPort instanceof MessagePort);    value.hereIsYourPort.postMessage('工作线程正在发送此音讯');    value.hereIsYourPort.close();  });}

下面的例子中,咱们借助了worker和parentPort自身的消息传递性能,传递了一个显式的MessageChannel中的MessagePort。

而后又通过该MessagePort来进行音讯的散发。

receiveMessageOnPort

除了port的on('message')办法之外,咱们还能够应用receiveMessageOnPort来手动接管音讯:

const { MessageChannel, receiveMessageOnPort } = require('worker_threads');const { port1, port2 } = new MessageChannel();port1.postMessage({ hello: 'world' });console.log(receiveMessageOnPort(port2));// Prints: { message: { hello: 'world' } }console.log(receiveMessageOnPort(port2));// Prints: undefined

moveMessagePortToContext

先理解一下nodejs中的Context的概念,咱们能够从vm中创立context,它是一个隔离的上下文环境,从而保障不同运行环境的安全性,咱们看一个context的例子:

const vm = require('vm');const x = 1;const context = { x: 2 };vm.createContext(context); // 上下文隔离化对象。const code = 'x += 40; var y = 17;';// `x` and `y` 是上下文中的全局变量。// 最后,x 的值为 2,因为这是 context.x 的值。vm.runInContext(code, context);console.log(context.x); // 42console.log(context.y); // 17console.log(x); // 1; y 没有定义。

在worker中,咱们能够将一个MessagePort move到其余的context中。

worker.moveMessagePortToContext(port, contextifiedSandbox)

这个办法接管两个参数,第一个参数就是要move的MessagePort,第二个参数就是vm.createContext()创立的context对象。

worker_threads的线程池

下面咱们提到了应用单个的worker thread,然而当初程序中一个线程往往是不够的,咱们须要创立一个线程池来保护worker thread对象。

nodejs提供了AsyncResource类,来作为对异步资源的扩大。

AsyncResource类是async_hooks模块中的。

上面咱们看下怎么应用AsyncResource类来创立worker的线程池。

假如咱们有一个task,应用来执行两个数相加,脚本名字叫做task_processor.js:

const { parentPort } = require('worker_threads');parentPort.on('message', (task) => {  parentPort.postMessage(task.a + task.b);});

上面是worker pool的实现:

const { AsyncResource } = require('async_hooks');const { EventEmitter } = require('events');const path = require('path');const { Worker } = require('worker_threads');const kTaskInfo = Symbol('kTaskInfo');const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');class WorkerPoolTaskInfo extends AsyncResource {  constructor(callback) {    super('WorkerPoolTaskInfo');    this.callback = callback;  }  done(err, result) {    this.runInAsyncScope(this.callback, null, err, result);    this.emitDestroy();  // `TaskInfo`s are used only once.  }}class WorkerPool extends EventEmitter {  constructor(numThreads) {    super();    this.numThreads = numThreads;    this.workers = [];    this.freeWorkers = [];    for (let i = 0; i < numThreads; i++)      this.addNewWorker();  }  addNewWorker() {    const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));    worker.on('message', (result) => {      // In case of success: Call the callback that was passed to `runTask`,      // remove the `TaskInfo` associated with the Worker, and mark it as free      // again.      worker[kTaskInfo].done(null, result);      worker[kTaskInfo] = null;      this.freeWorkers.push(worker);      this.emit(kWorkerFreedEvent);    });    worker.on('error', (err) => {      // In case of an uncaught exception: Call the callback that was passed to      // `runTask` with the error.      if (worker[kTaskInfo])        worker[kTaskInfo].done(err, null);      else        this.emit('error', err);      // Remove the worker from the list and start a new Worker to replace the      // current one.      this.workers.splice(this.workers.indexOf(worker), 1);      this.addNewWorker();    });    this.workers.push(worker);    this.freeWorkers.push(worker);    this.emit(kWorkerFreedEvent);  }  runTask(task, callback) {    if (this.freeWorkers.length === 0) {      // No free threads, wait until a worker thread becomes free.      this.once(kWorkerFreedEvent, () => this.runTask(task, callback));      return;    }    const worker = this.freeWorkers.pop();    worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);    worker.postMessage(task);  }  close() {    for (const worker of this.workers) worker.terminate();  }}module.exports = WorkerPool;

咱们给worker创立了一个新的kTaskInfo属性,并且将异步的callback封装到WorkerPoolTaskInfo中,赋值给worker.kTaskInfo.

接下来咱们就能够应用workerPool了:

const WorkerPool = require('./worker_pool.js');const os = require('os');const pool = new WorkerPool(os.cpus().length);let finished = 0;for (let i = 0; i < 10; i++) {  pool.runTask({ a: 42, b: 100 }, (err, result) => {    console.log(i, err, result);    if (++finished === 10)      pool.close();  });}
本文作者:flydean程序那些事

本文链接:http://www.flydean.com/nodejs-worker-thread/

本文起源:flydean的博客

欢送关注我的公众号:「程序那些事」最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!