应用例子

为了让node利用可能在多核服务器中进步性能,node提供cluster API,用于创立多个工作过程,而后由这些工作过程并行处理申请。

// master.jsconst cluster = require('cluster');const cpusLen = require('os').cpus().length;const path = require('path');console.log(`主过程:${process.pid}`);cluster.setupMaster({  exec: path.resolve(__dirname, './work.js'),});for (let i = 0; i < cpusLen; i++) {  cluster.fork();}// work.jsconst http = require('http');console.log(`工作过程:${process.pid}`);http.createServer((req, res) => {  res.end('hello');}).listen(8080);

下面例子中,应用cluster创立多个工作过程,这些工作过程可能共用8080端口,咱们申请localhost:8080,申请工作会交给其中一个工作过程进行解决,该工作过程解决实现后,自行响应申请。

端口占用问题

这里有个问题,后面例子中,呈现多个过程监听雷同的端口,为什么程序没有报端口占用问题,因为socket套接字监听端口会有一个文件描述符,而每个过程的文件描述符都不雷同,无奈让多个过程都监听同一个端口,如下:

// master.jsconst fork = require('child_process').fork;const cpusLen = require('os').cpus().length;const path = require('path');console.log(`主过程:${process.pid}`);for (let i = 0; i < cpusLen; i++) {  fork(path.resolve(__dirname, './work.js'));}// work.jsconst http = require('http');console.log(`工作过程:${process.pid}`);http.createServer((req, res) => {  res.end('hello');}).listen(8080);

当运行master.js文件的时候,会报端口被占用的问题(Error: listen EADDRINUSE: address already in use :::8080)。

咱们批改下,只应用主过程监听端口,主过程将申请套接字发放给工作过程,由工作过程来进行业务解决。

// master.jsconst fork = require('child_process').fork;const cpusLen = require('os').cpus().length;const path = require('path');const net = require('net');const server = net.createServer();console.log(`主过程:${process.pid}`);const works = [];let current = 0for (let i = 0; i < cpusLen; i++) {  works.push(fork(path.resolve(__dirname, './work.js')));}server.listen(8080, () => {  if (current > works.length - 1) current = 0  works[current++].send('server', server);  server.close();});// work.jsconst http = require('http');const server = http.createServer((req, res) => {  res.end('hello');});console.log(`工作过程:${process.pid}`);process.on('message', (type, tcp) => {  if (type === 'server') {    tcp.on('connection', socket => {      server.emit('connection', socket)    });  }})

实际上,cluster新建的工作过程并没有真正去监听端口,在工作过程中的net server listen函数会被hack,工作过程调用listen,不会有任何成果。监听端口工作交给了主过程,该端口对应的工作过程会被绑定到主过程中,当申请进来的时候,主过程会将申请的套接字下发给相应的工作过程,工作过程再对申请进行解决。

接下来咱们看看cluster API中的实现,看下cluster外部是如何做到上面两个性能:

  • 主过程:对传入的端口进行监听
  • 工作过程:

    • 主过程注册当前工作过程,如果主过程是第一次监听此端口,就新建一个TCP服务器,并将当前工作过程和TCP服务器绑定。
    • hack掉工作过程中的listen函数,让该过程不能监听端口

源码解读

本文应用的是node@14.15.4。

// lib/cluster.js'use strict';const childOrPrimary = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'primary';module.exports = require(`internal/cluster/${childOrPrimary}`);

这个是cluster API入口,在援用cluster的时候,程序首先会判断环境变量中是否存在NODE_UNIQUE_ID变量,来确定以后程序是在主过程运行还是工作过程中运行。NODE_UNIQUE_ID实际上就是一个自增的数字,是工作过程的ID,前面会在创立工作过程相干代码中看到,这里就不多做解释了。

通过后面代码咱们晓得,如果在主过程中援用cluster,程序导出的是internal/cluster/primary.js这文件,因而咱们先看看这个文件外部的一些实现。

// internal/cluster/primary.js// ...const EventEmitter = require('events');const cluster = new EventEmitter();// 上面这三个参数会在node外部性能实现的时候用到,之后咱们看net源码的时候会用到这些参数cluster.isWorker = false; // 是否是工作过程cluster.isMaster = true; // 是否是主过程cluster.isPrimary = true; // 是否是主过程module.exports = cluster;cluster.setupPrimary = function(options) {  const settings = {    args: ArrayPrototypeSlice(process.argv, 2),    exec: process.argv[1],    execArgv: process.execArgv,    silent: false,    ...cluster.settings,    ...options  };  cluster.settings = settings;  // ...}cluster.setupMaster = cluster.setupPrimary;cluster.fork = function(env) {  cluster.setupPrimary();  const id = ++ids;  const workerProcess = createWorkerProcess(id, env);}const { fork } = require('child_process');function createWorkerProcess(id, env) {  // 这里的NODE_UNIQUE_ID就是入口文件用来分辨以后过程类型用的  const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` };  // ...  return fork(cluster.settings.exec, cluster.settings.args, {    env: workerEnv,    // ...  });}

cluster.fork用来新建一个工作过程,其外部应用child_process中的fork函数,来创立一个过程,创立的新过程默认会运行命令行中执行的入口文件(process.argv[1]),当然咱们也能够执行luster.setupPrimary或者cluster.setupMaster并传入exec参数来批改工作过程执行的文件。

咱们再来简略看下工作过程援用的internal/cluster/child.js文件:

// internal/cluster/child.jsconst EventEmitter = require('events');const cluster = new EventEmitter();module.exports = cluster;// 这里定义的就是一个工作过程,后续会用到这里的参数cluster.isWorker = true;cluster.isMaster = false;cluster.isPrimary = false;cluster._getServer = function(obj, options, cb) {  // ...};// ...

这里咱们次要记住工作过程中的cluster有个_getServer函数,后续流程走到这个函数的时候,会具体看外面的代码。

接下来进入正题,看下net server listen函数:

// lib/net.jsServer.prototype.listen = function(...args) {  // ...  if (typeof options.port === 'number' || typeof options.port === 'string') {    // 如果是向最开始那种间接调用listen时间接传入一个端口,就会间接进入else,咱们也次要看else中的逻辑    if (options.host) {      // ...    } else  {      // listen(8080, () => {...})调用形式,将运行这条分支      listenInCluster(this, null, options.port | 0, 4, backlog, undefined, options.exclusive);    }    return this;  }  // ...}function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) {  // ...  // 这里就用到cluster初始时写入的isPrimary参数,以后如果在主过程isPrimary就为true,反之为false。主过程会间接去执行server._listen2函数,工作过程之后也会执行这个函数,等下一起看server._listen2外部的性能。  if (cluster.isPrimary || exclusive) {    server._listen2(address, port, addressType, backlog, fd, flags);    return;  }  // 前面的代码只有在工作过程中才会执行  const serverQuery = {    address: address,    port: port,    addressType: addressType,    fd: fd,    flags,  };  // 这里执行的是internal/cluster/child.js中的cluster._getServer,同时会传入listenOnPrimaryHandle这个回调函数,这个回调函数会在主过程增加端口监听,同时将工作过程绑定到对应的TCP服务后才会执行,外面工作就是对net server listen等函数进行hack。  cluster._getServer(server, serverQuery, listenOnPrimaryHandle);  function listenOnPrimaryHandle(err, handle) {    // ...    server._handle = handle;    server._listen2(address, port, addressType, backlog, fd, flags);  }}// 等工作过程执行这个函数的时候再一起讲Server.prototype._listen2 = setupListenHandle;function setupListenHandle(...) {  // ...}

从下面代码中能够得悉,主过程和工作过程中执行net server listen都会进入到一个setupListenHandle函数中。不过区别是,主过程是间接执行该函数,而工作过程须要先执行cluster._getServer函数,让主过程监听工作过程端口,同时对listen函数进行hack解决,而后再执行setupListenHandle函数。接下来咱们看下cluster._getServer函数的外部实现。

// lib/internal/cluster/child.jscluster._getServer = function(obj, options, cb) {  // ...  // 这个是工作过程第一次发送内部消息的内容。  // 留神这里act值为queryServer  const message = {    act: 'queryServer',    index,    data: null,    ...options  };  // ...  // send函数外部应用IPC信道向工作过程发送内部消息。主过程在应用cluster.fork新建工作过程的时候,会让工作过程监听内部消息事件,上面会展现具体代码  // send调用传入的回调函数会被写入到lib/internal/cluster/utils.js文件中的callbacks map中,等前面要用的时候,再提取进去。  send(message, (reply, handle) => {    if (typeof obj._setServerData === 'function')      obj._setServerData(reply.data);    if (handle)      shared(reply, handle, indexesKey, index, cb);    else      // 这个函数外部会定义一个listen函数,用来hack net server listen函数      rr(reply, indexesKey, index, cb);  });  // ...}function send(message, cb) {  return sendHelper(process, message, null, cb);}
// lib/internal/cluster/utils.js// ...const callbacks = new SafeMap();let seq = 0;function sendHelper(proc, message, handle, cb) {  message = { cmd: 'NODE_CLUSTER', ...message, seq };  if (typeof cb === 'function')    // 这里将传入的回调函数记录下来。    // 留神这里的key是递增数字    callbacks.set(seq, cb);  seq += 1;  // 利用IPC信道,给当前工作过程发送内部消息  return proc.send(message, handle);}// ...

工作过程中cluster._getServer函数执行,将生成一个回调函数,将这个回调函数寄存起来,并且会应用IPC信道,向当前工作过程发送内部消息。主过程执行cluster.fork生成工作过程的时候,会在工作过程中注册internalMessage事件。接下来咱们看下cluster.fork中与工作过程注册内部消息事件的代码。

// internal/cluster/primary.jscluster.fork = function(env) {  // ...  // internal函数执行会返回一个接管message对象的回调函数。  // 能够先看下lib/internal/cluster/utils.js中的internal函数,理解外部的工作  worker.process.on('internalMessage', internal(worker, onmessage));  // ...}const methodMessageMapping = {  close,  exitedAfterDisconnect,  listening,  online,  queryServer,};// 第一次触发internalMessage执行的回调是这个函数。// 此时message的act为queryServerfunction onmessage(message, handle) {  // internal外部在执行onmessage时会将这个函数执行上下文绑定到工作过程的work上  const worker = this;  // 工作过程传入的  const fn = methodMessageMapping[message.act];  if (typeof fn === 'function')    fn(worker, message);}function queryServer(worker, message) {  // ...}
// lib/internal/cluster/utils.js// ...const callbacks = new SafeMap();function internal(worker, cb) {  return function onInternalMessage(message, handle) {    let fn = cb;    // 工作过程第一次发送内部消息:ack为undefined,callback为undefined,间接执行internal调用传入的onmessage函数,message函数只是用于解析音讯的,理论会执行queryServer函数    // 工作过程第二次发送内部消息:主过程queryServer函数执行会用工作过程发送内部消息,并向message中增加ack参数,让message.ack=message.seq    if (message.ack !== undefined) {      const callback = callbacks.get(message.ack);      if (callback !== undefined) {        fn = callback;        callbacks.delete(message.ack);      }    }    ReflectApply(fn, worker, arguments);  };}

工作过程第一次发送内部消息时,因为传入的message.ack(这里留神分清actack)为undefind,因而没方法间接拿到cluster._getServer中调用send写入的回调函数,因而只能先执行internal/cluster/primary.js中的queryServer函数。接下来看下queryServer函数外部逻辑。

// internal/cluster/primary.js// hadles中寄存的就是TCP服务器。// 主过程在代替工作过程监听端口生成新的TCP服务器前,// 须要先判断该服务器是否有创立,如果有,就间接复用之前的服务器,而后将工作过程绑定到相应的服务器上;如果没有,就新建一个TCP服务器,而后将工作过程绑定到新建的服务器上。function queryServer(worker, message) {  // 这里key就是服务器的惟一标识  const key = `${message.address}:${message.port}:${message.addressType}:` +              `${message.fd}:${message.index}`;  // 从现存的服务器中查看是否有以后须要的服务器  let handle = handles.get(key);  // 如果没有须要的服务器,就新建一个  if (handle === undefined) {    // ...    // RoundRobinHandle构建函数中,会新建一个TCP服务器    let constructor = RoundRobinHandle;    handle = new constructor(key, address, message);    // 将这个服务器寄存起来    handles.set(key, handle);  }  if (!handle.data)    handle.data = message.data;  // 能够先看下上面对于RoundRobinHandle构建函数的代码,理解外部机制  handle.add(worker, (errno, reply, handle) => {    const { data } = handles.get(key);    if (errno)      handles.delete(key);    // 这里会向工作过程中发送第二次内部消息。    // 这里只传了worker和message,没有传入handle和cb    send(worker, {      errno,      key,      ack: message.seq, // 留神这里减少了ack属性      data,      ...reply    }, handle);  });}function send(worker, message, handle, cb) {  return sendHelper(worker.process, message, handle, cb);}
// internal/cluster/round_robin_handle.jsfunction RoundRobinHandle(key, address, { port, fd, flags }) {  // ...  this.server = net.createServer(assert.fail);  if (fd >= 0)    this.server.listen({ fd });  else if (port >= 0) {    this.server.listen({      port,      host: address,      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),    });  } else    this.server.listen(address);  // 当服务处于监听状态,就会执行这个回调。  this.server.once('listening', () => {    this.handle = this.server._handle;    this.handle.onconnection = (err, handle) => this.distribute(err, handle);    this.server._handle = null;    // 留神:如果监听胜利,就会将server删除    this.server = null;  });}RoundRobinHandle.prototype.add = function(worker, send) {  const done = () => {    if (this.handle.getsockname) {      // ...      send(null, { sockname: out }, null);    } else {      send(null, null, null);  // UNIX socket.    }    // ...  };  // 如果在add执行前server就曾经处于listening状态,this.server就会为null  if (this.server === null)    return done();  // 如果add执行后,server才处于listening,就会走到这里,始终都会执行add调用时传入的回调  this.server.once('listening', done);}

在这一步,主过程替工作过程生成或者是获取了一个可用的TCP服务器,并将工作过程与相应的服务器绑定在一起(不便后续申请任务分配)。当工作过程绑定实现当前,就向工作过程中发送了第二次内部消息,接下来咱们再次进入lib/internal/cluster/utils.js看看外部流程:

// lib/internal/cluster/utils.jsconst callbacks = new SafeMap();function internal(worker, cb) {  // 留神这里handle为undefined  return function onInternalMessage(message, handle) {    let fn = cb;    // 第二次工作过程内部消息执行的时候message.ack曾经被赋值为message.seq    // 因而这次可能获取到之前lib/cluster.child.js cluster._getServer函数执行是调用send写入的回调函数    if (message.ack !== undefined) {      const callback = callbacks.get(message.ack);      if (callback !== undefined) {        fn = callback;        callbacks.delete(message.ack);      }    }    ReflectApply(fn, worker, arguments);  };}

工作过程第二次承受到内部消息时,cluster._getServer函数执行是调用send写入的回调函数会被执行,接下来看下send写入的回调函数内容:

// lib/internal/cluster/child.jssend(message, (reply, handle) => {  // 此时handle为undefined,流程会间接运行rr函数  if (handle)    shared(reply, handle, indexesKey, index, cb);   else    // 这里的cb是lib/net.js在执行cluster._getServer时传入listenOnPrimaryHandle函数,前面会介绍他的工作。    rr(reply, indexesKey, index, cb);});function rr(message, indexesKey, index, cb) {  let key = message.key;  // 这里定义的listen用于hack net server.listen,在工作过程中执行listen,工作过程并不会真正去监听端口  function listen(backlog) {    return 0;  }  function close() {...}  function getsockname(out) {...}  const handle = { close, listen, ref: noop, unref: noop };  handles.set(key, handle);  // 执行传入的listenOnPrimaryHandle函数  cb(0, handle);}

rr函数执行,会新建几个与net server中同名的函数,并通过handle传入listenOnPrimaryHandle函数。

// lib/net.jsfunction listenInCluster(...) {  cluster._getServer(server, serverQuery, listenOnPrimaryHandle);  // listenOnPrimaryHandle函数中将工作过程生成的server._handle对象替换成自定义的handle对象,后续server listen执行的就是server._handle中的listen函数,因而这里就实现了对工作过程中的listen函数hack  function listenOnPrimaryHandle(err, handle) {    // ...    // handle:{ listen: ..., close: ...., ... }    server._handle = handle;    server._listen2(address, port, addressType, backlog, fd, flags);  }}

上面看下server._listen2函数执行内容

Server.prototype._listen2 = setupListenHandle;function setupListenHandle(address, port, addressType, backlog, fd, flags) {  // 疏忽,只有是从工作过程进来的,this._handle就是本人定义的对象内容  if (this._handle) {    debug('setupListenHandle: have a handle already');  } else {    // 主过程会进入这一层逻辑,会在这里生成一个服务器    // ...    rval = createServerHandle(address, port, addressType, fd, flags);    // ...    this._handle = rval;  }  const err = this._handle.listen(backlog || 511);  // ...}

至此,工作过程端口监听相干的源码就看完了,当初差不多理解到工作过程中执行net server listen时,工作过程并不会真正去监听端口,端口监听工作始终会交给主过程来实现。主过程在接到工作过程发来的端口监听的时候,首先会判断是否有雷同的服务器,如果有,就间接将工作过程绑定到对应的服务器上,这样就不会呈现端口被占用的问题;如果没有对应的服务器,就生成一个新的服务。主过程承受到申请的时候,就会将申请任务分配给工作过程,如何调配,就须要看具体应用的哪种负载平衡了。