应用例子
为了让 node 利用可能在多核服务器中进步性能,node 提供cluster
API,用于创立多个工作过程,而后由这些工作过程并行处理申请。
// master.js
const cluster = require('cluster');
const cpusLen = require('os').cpus().length;
const path = require('path');
console.log(` 主过程:${process.pid}`);
cluster.setupMaster({exec: path.resolve(__dirname, './work.js'),
});
for (let i = 0; i < cpusLen; i++) {cluster.fork();
}
// work.js
const http = require('http');
console.log(` 工作过程:${process.pid}`);
http.createServer((req, res) => {res.end('hello');
}).listen(8080);
下面例子中,应用 cluster
创立多个工作过程,这些工作过程可能共用 8080
端口,咱们申请localhost:8080
,申请工作会交给其中一个工作过程进行解决,该工作过程解决实现后,自行响应申请。
端口占用问题
这里有个问题,后面例子中,呈现多个过程监听雷同的端口,为什么程序没有报端口占用问题,因为 socket 套接字监听端口会有一个文件描述符,而每个过程的文件描述符都不雷同,无奈让多个过程都监听同一个端口,如下:
// master.js
const fork = require('child_process').fork;
const cpusLen = require('os').cpus().length;
const path = require('path');
console.log(` 主过程:${process.pid}`);
for (let i = 0; i < cpusLen; i++) {fork(path.resolve(__dirname, './work.js'));
}
// work.js
const http = require('http');
console.log(` 工作过程:${process.pid}`);
http.createServer((req, res) => {res.end('hello');
}).listen(8080);
当运行 master.js
文件的时候,会报端口被占用的问题(Error: listen EADDRINUSE: address already in use :::8080
)。
咱们批改下,只应用主过程监听端口,主过程将申请套接字发放给工作过程,由工作过程来进行业务解决。
// master.js
const fork = require('child_process').fork;
const cpusLen = require('os').cpus().length;
const path = require('path');
const net = require('net');
const server = net.createServer();
console.log(` 主过程:${process.pid}`);
const works = [];
let current = 0
for (let i = 0; i < cpusLen; i++) {works.push(fork(path.resolve(__dirname, './work.js')));
}
server.listen(8080, () => {if (current > works.length - 1) current = 0
works[current++].send('server', server);
server.close();});
// work.js
const http = require('http');
const server = http.createServer((req, res) => {res.end('hello');
});
console.log(` 工作过程:${process.pid}`);
process.on('message', (type, tcp) => {if (type === 'server') {
tcp.on('connection', socket => {server.emit('connection', socket)
});
}
})
实际上,cluster
新建的工作过程并没有真正去监听端口,在工作过程中的 net server listen 函数会被 hack,工作过程调用 listen,不会有任何成果。监听端口工作交给了主过程,该端口对应的工作过程会被绑定到主过程中,当申请进来的时候,主过程会将申请的套接字下发给相应的工作过程,工作过程再对申请进行解决。
接下来咱们看看 cluster
API
中的实现,看下 cluster
外部是如何做到上面两个性能:
- 主过程:对传入的端口进行监听
-
工作过程:
- 主过程注册当前工作过程,如果主过程是第一次监听此端口,就新建一个 TCP 服务器,并将当前工作过程和 TCP 服务器绑定。
- hack 掉工作过程中的 listen 函数,让该过程不能监听端口
源码解读
本文应用的是 node@14.15.4。
// lib/cluster.js
'use strict';
const childOrPrimary = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'primary';
module.exports = require(`internal/cluster/${childOrPrimary}`);
这个是 cluster
API 入口,在援用cluster
的时候,程序首先会判断环境变量中是否存在 NODE_UNIQUE_ID
变量,来确定以后程序是在主过程运行还是工作过程中运行。NODE_UNIQUE_ID
实际上就是一个自增的数字,是工作过程的 ID,前面会在创立工作过程相干代码中看到,这里就不多做解释了。
通过后面代码咱们晓得,如果在主过程中援用 cluster
,程序导出的是internal/cluster/primary.js
这文件,因而咱们先看看这个文件外部的一些实现。
// internal/cluster/primary.js
// ...
const EventEmitter = require('events');
const cluster = new EventEmitter();
// 上面这三个参数会在 node 外部性能实现的时候用到,之后咱们看 net 源码的时候会用到这些参数
cluster.isWorker = false; // 是否是工作过程
cluster.isMaster = true; // 是否是主过程
cluster.isPrimary = true; // 是否是主过程
module.exports = cluster;
cluster.setupPrimary = function(options) {
const settings = {args: ArrayPrototypeSlice(process.argv, 2),
exec: process.argv[1],
execArgv: process.execArgv,
silent: false,
...cluster.settings,
...options
};
cluster.settings = settings;
// ...
}
cluster.setupMaster = cluster.setupPrimary;
cluster.fork = function(env) {cluster.setupPrimary();
const id = ++ids;
const workerProcess = createWorkerProcess(id, env);
}
const {fork} = require('child_process');
function createWorkerProcess(id, env) {
// 这里的 NODE_UNIQUE_ID 就是入口文件用来分辨以后过程类型用的
const workerEnv = {...process.env, ...env, NODE_UNIQUE_ID: `${id}` };
// ...
return fork(cluster.settings.exec, cluster.settings.args, {
env: workerEnv,
// ...
});
}
cluster.fork
用来新建一个工作过程,其外部应用 child_process
中的 fork
函数,来创立一个过程,创立的新过程默认会运行命令行中执行的入口文件(process.argv[1]),当然咱们也能够执行 luster.setupPrimary
或者 cluster.setupMaster
并传入 exec
参数来批改工作过程执行的文件。
咱们再来简略看下工作过程援用的 internal/cluster/child.js
文件:
// internal/cluster/child.js
const EventEmitter = require('events');
const cluster = new EventEmitter();
module.exports = cluster;
// 这里定义的就是一个工作过程,后续会用到这里的参数
cluster.isWorker = true;
cluster.isMaster = false;
cluster.isPrimary = false;
cluster._getServer = function(obj, options, cb) {// ...};
// ...
这里咱们次要记住工作过程中的 cluster
有个 _getServer
函数,后续流程走到这个函数的时候,会具体看外面的代码。
接下来进入正题,看下 net server listen
函数:
// lib/net.js
Server.prototype.listen = function(...args) {
// ...
if (typeof options.port === 'number' || typeof options.port === 'string') {
// 如果是向最开始那种间接调用 listen 时间接传入一个端口,就会间接进入 else,咱们也次要看 else 中的逻辑
if (options.host) {// ...} else {// listen(8080, () => {...})调用形式,将运行这条分支
listenInCluster(this, null, options.port | 0, 4, backlog, undefined, options.exclusive);
}
return this;
}
// ...
}
function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) {
// ...
// 这里就用到 cluster 初始时写入的 isPrimary 参数,以后如果在主过程 isPrimary 就为 true,反之为 false。主过程会间接去执行 server._listen2 函数,工作过程之后也会执行这个函数,等下一起看 server._listen2 外部的性能。if (cluster.isPrimary || exclusive) {server._listen2(address, port, addressType, backlog, fd, flags);
return;
}
// 前面的代码只有在工作过程中才会执行
const serverQuery = {
address: address,
port: port,
addressType: addressType,
fd: fd,
flags,
};
// 这里执行的是 internal/cluster/child.js 中的 cluster._getServer,同时会传入 listenOnPrimaryHandle 这个回调函数,这个回调函数会在主过程增加端口监听,同时将工作过程绑定到对应的 TCP 服务后才会执行,外面工作就是对 net server listen 等函数进行 hack。cluster._getServer(server, serverQuery, listenOnPrimaryHandle);
function listenOnPrimaryHandle(err, handle) {
// ...
server._handle = handle;
server._listen2(address, port, addressType, backlog, fd, flags);
}
}
// 等工作过程执行这个函数的时候再一起讲
Server.prototype._listen2 = setupListenHandle;
function setupListenHandle(...) {// ...}
从下面代码中能够得悉,主过程和工作过程中执行 net server listen
都会进入到一个 setupListenHandle
函数中。不过区别是,主过程是间接执行该函数,而工作过程须要先执行 cluster._getServer
函数,让主过程监听工作过程端口,同时对 listen
函数进行 hack 解决,而后再执行 setupListenHandle
函数。接下来咱们看下 cluster._getServer
函数的外部实现。
// lib/internal/cluster/child.js
cluster._getServer = function(obj, options, cb) {
// ...
// 这个是工作过程第一次发送内部消息的内容。// 留神这里 act 值为 queryServer
const message = {
act: 'queryServer',
index,
data: null,
...options
};
// ...
// send 函数外部应用 IPC 信道向工作过程发送内部消息。主过程在应用 cluster.fork 新建工作过程的时候,会让工作过程监听内部消息事件,上面会展现具体代码
// send 调用传入的回调函数会被写入到 lib/internal/cluster/utils.js 文件中的 callbacks map 中,等前面要用的时候,再提取进去。send(message, (reply, handle) => {if (typeof obj._setServerData === 'function')
obj._setServerData(reply.data);
if (handle)
shared(reply, handle, indexesKey, index, cb);
else
// 这个函数外部会定义一个 listen 函数,用来 hack net server listen 函数
rr(reply, indexesKey, index, cb);
});
// ...
}
function send(message, cb) {return sendHelper(process, message, null, cb);
}
// lib/internal/cluster/utils.js
// ...
const callbacks = new SafeMap();
let seq = 0;
function sendHelper(proc, message, handle, cb) {message = { cmd: 'NODE_CLUSTER', ...message, seq};
if (typeof cb === 'function')
// 这里将传入的回调函数记录下来。// 留神这里的 key 是递增数字
callbacks.set(seq, cb);
seq += 1;
// 利用 IPC 信道,给当前工作过程发送内部消息
return proc.send(message, handle);
}
// ...
工作过程中 cluster._getServer
函数执行,将生成一个回调函数,将这个回调函数寄存起来,并且会应用 IPC 信道,向当前工作过程发送内部消息。主过程执行 cluster.fork
生成工作过程的时候,会在工作过程中注册 internalMessage
事件。接下来咱们看下 cluster.fork
中与工作过程注册内部消息事件的代码。
// internal/cluster/primary.js
cluster.fork = function(env) {
// ...
// internal 函数执行会返回一个接管 message 对象的回调函数。// 能够先看下 lib/internal/cluster/utils.js 中的 internal 函数,理解外部的工作
worker.process.on('internalMessage', internal(worker, onmessage));
// ...
}
const methodMessageMapping = {
close,
exitedAfterDisconnect,
listening,
online,
queryServer,
};
// 第一次触发 internalMessage 执行的回调是这个函数。// 此时 message 的 act 为 queryServer
function onmessage(message, handle) {
// internal 外部在执行 onmessage 时会将这个函数执行上下文绑定到工作过程的 work 上
const worker = this;
// 工作过程传入的
const fn = methodMessageMapping[message.act];
if (typeof fn === 'function')
fn(worker, message);
}
function queryServer(worker, message) {// ...}
// lib/internal/cluster/utils.js
// ...
const callbacks = new SafeMap();
function internal(worker, cb) {return function onInternalMessage(message, handle) {
let fn = cb;
// 工作过程第一次发送内部消息:ack 为 undefined,callback 为 undefined,间接执行 internal 调用传入的 onmessage 函数,message 函数只是用于解析音讯的,理论会执行 queryServer 函数
// 工作过程第二次发送内部消息:主过程 queryServer 函数执行会用工作过程发送内部消息,并向 message 中增加 ack 参数,让 message.ack=message.seq
if (message.ack !== undefined) {const callback = callbacks.get(message.ack);
if (callback !== undefined) {
fn = callback;
callbacks.delete(message.ack);
}
}
ReflectApply(fn, worker, arguments);
};
}
工作过程第一次发送内部消息时,因为传入的 message.ack
(这里留神分清act
和ack
)为 undefind
,因而没方法间接拿到cluster._getServer
中调用 send
写入的回调函数,因而只能先执行 internal/cluster/primary.js
中的 queryServer
函数。接下来看下 queryServer
函数外部逻辑。
// internal/cluster/primary.js
// hadles 中寄存的就是 TCP 服务器。// 主过程在代替工作过程监听端口生成新的 TCP 服务器前,// 须要先判断该服务器是否有创立,如果有,就间接复用之前的服务器,而后将工作过程绑定到相应的服务器上;如果没有,就新建一个 TCP 服务器,而后将工作过程绑定到新建的服务器上。function queryServer(worker, message) {
// 这里 key 就是服务器的惟一标识
const key = `${message.address}:${message.port}:${message.addressType}:` +
`${message.fd}:${message.index}`;
// 从现存的服务器中查看是否有以后须要的服务器
let handle = handles.get(key);
// 如果没有须要的服务器,就新建一个
if (handle === undefined) {
// ...
// RoundRobinHandle 构建函数中,会新建一个 TCP 服务器
let constructor = RoundRobinHandle;
handle = new constructor(key, address, message);
// 将这个服务器寄存起来
handles.set(key, handle);
}
if (!handle.data)
handle.data = message.data;
// 能够先看下上面对于 RoundRobinHandle 构建函数的代码,理解外部机制
handle.add(worker, (errno, reply, handle) => {const { data} = handles.get(key);
if (errno)
handles.delete(key);
// 这里会向工作过程中发送第二次内部消息。// 这里只传了 worker 和 message,没有传入 handle 和 cb
send(worker, {
errno,
key,
ack: message.seq, // 留神这里减少了 ack 属性
data,
...reply
}, handle);
});
}
function send(worker, message, handle, cb) {return sendHelper(worker.process, message, handle, cb);
}
// internal/cluster/round_robin_handle.js
function RoundRobinHandle(key, address, { port, fd, flags}) {
// ...
this.server = net.createServer(assert.fail);
if (fd >= 0)
this.server.listen({fd});
else if (port >= 0) {
this.server.listen({
port,
host: address,
ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
});
} else
this.server.listen(address);
// 当服务处于监听状态,就会执行这个回调。this.server.once('listening', () => {
this.handle = this.server._handle;
this.handle.onconnection = (err, handle) => this.distribute(err, handle);
this.server._handle = null;
// 留神:如果监听胜利,就会将 server 删除
this.server = null;
});
}
RoundRobinHandle.prototype.add = function(worker, send) {const done = () => {if (this.handle.getsockname) {
// ...
send(null, { sockname: out}, null);
} else {send(null, null, null); // UNIX socket.
}
// ...
};
// 如果在 add 执行前 server 就曾经处于 listening 状态,this.server 就会为 null
if (this.server === null)
return done();
// 如果 add 执行后,server 才处于 listening,就会走到这里,始终都会执行 add 调用时传入的回调
this.server.once('listening', done);
}
在这一步,主过程替工作过程生成或者是获取了一个可用的 TCP 服务器,并将工作过程与相应的服务器绑定在一起(不便后续申请任务分配)。当工作过程绑定实现当前,就向工作过程中发送了第二次内部消息,接下来咱们再次进入 lib/internal/cluster/utils.js
看看外部流程:
// lib/internal/cluster/utils.js
const callbacks = new SafeMap();
function internal(worker, cb) {
// 留神这里 handle 为 undefined
return function onInternalMessage(message, handle) {
let fn = cb;
// 第二次工作过程内部消息执行的时候 message.ack 曾经被赋值为 message.seq
// 因而这次可能获取到之前 lib/cluster.child.js cluster._getServer 函数执行是调用 send 写入的回调函数
if (message.ack !== undefined) {const callback = callbacks.get(message.ack);
if (callback !== undefined) {
fn = callback;
callbacks.delete(message.ack);
}
}
ReflectApply(fn, worker, arguments);
};
}
工作过程第二次承受到内部消息时,cluster._getServer 函数执行是调用 send 写入的回调函数会被执行,接下来看下 send 写入的回调函数内容:
// lib/internal/cluster/child.js
send(message, (reply, handle) => {
// 此时 handle 为 undefined,流程会间接运行 rr 函数
if (handle)
shared(reply, handle, indexesKey, index, cb);
else
// 这里的 cb 是 lib/net.js 在执行 cluster._getServer 时传入 listenOnPrimaryHandle 函数,前面会介绍他的工作。rr(reply, indexesKey, index, cb);
});
function rr(message, indexesKey, index, cb) {
let key = message.key;
// 这里定义的 listen 用于 hack net server.listen,在工作过程中执行 listen,工作过程并不会真正去监听端口
function listen(backlog) {return 0;}
function close() {...}
function getsockname(out) {...}
const handle = {close, listen, ref: noop, unref: noop};
handles.set(key, handle);
// 执行传入的 listenOnPrimaryHandle 函数
cb(0, handle);
}
rr
函数执行,会新建几个与 net server
中同名的函数,并通过 handle 传入 listenOnPrimaryHandle
函数。
// lib/net.js
function listenInCluster(...) {cluster._getServer(server, serverQuery, listenOnPrimaryHandle);
// listenOnPrimaryHandle 函数中将工作过程生成的 server._handle 对象替换成自定义的 handle 对象,后续 server listen 执行的就是 server._handle 中的 listen 函数,因而这里就实现了对工作过程中的 listen 函数 hack
function listenOnPrimaryHandle(err, handle) {
// ...
// handle:{listen: ..., close: ...., ...}
server._handle = handle;
server._listen2(address, port, addressType, backlog, fd, flags);
}
}
上面看下 server._listen2
函数执行内容
Server.prototype._listen2 = setupListenHandle;
function setupListenHandle(address, port, addressType, backlog, fd, flags) {
// 疏忽,只有是从工作过程进来的,this._handle 就是本人定义的对象内容
if (this._handle) {debug('setupListenHandle: have a handle already');
} else {
// 主过程会进入这一层逻辑,会在这里生成一个服务器
// ...
rval = createServerHandle(address, port, addressType, fd, flags);
// ...
this._handle = rval;
}
const err = this._handle.listen(backlog || 511);
// ...
}
至此,工作过程端口监听相干的源码就看完了,当初差不多理解到工作过程中执行 net server listen
时,工作过程并不会真正去监听端口,端口监听工作始终会交给主过程来实现。主过程在接到工作过程发来的端口监听的时候,首先会判断是否有雷同的服务器,如果有,就间接将工作过程绑定到对应的服务器上,这样就不会呈现端口被占用的问题;如果没有对应的服务器,就生成一个新的服务。主过程承受到申请的时候,就会将申请任务分配给工作过程,如何调配,就须要看具体应用的哪种负载平衡了。