乐趣区

Node.js – 阿里Egg的多进程模型和进程间通讯

前言
最近用 Egg 作为底层框架开发项目,好奇其多进程模型的管理实现,于是学习了解了一些东西,顺便记录下来。文章如有错误,请轻喷
为什么需要多进程
伴随科技的发展,现在的服务器基本上都是多核 cpu 的了。然而,Node 是一个单进程单线程语言(对于开发者来说是单线程,实际上不是)。我们都知道,cpu 的调度单位是线程,而基于 Node 的特性,那么我们每次只能利用一个 cpu。这样不仅仅利用率极低,而且容错更是不能接受(出错时会崩溃整个程序)。所以,Node 有了 cluster 来协助我们充分利用服务器的资源。
cluster 工作原理 关于 cluster 的工作原理推荐大家看这篇文章, 这里简单总结一下:

子进程的端口监听会被 hack 掉,而是统一由 master 的内部 TCP 监听,所以不会出现多个子进程监听同一端口而报错的现象。

请求统一经过 master 的内部 TCP,TCP 的请求处理逻辑中,会挑选一个 worker 进程向其发送一个 newconn 内部消息,随消息发送客户端句柄。(这里的挑选有两种方式,第一种是除 Windows 外所有平台的默认方法循环法,即由主进程负责监听端口,接收新连接后再将连接循环分发给工作进程。在分发中使用了一些内置技巧防止工作进程任务过载。第二种是主进程创建监听 socket 后发送给感兴趣的工作进程,由工作进程负责直接接收连接。)
worker 进程收到句柄后,创建客户端实例 (net.socket) 执行具体的业务逻辑,然后返回。

如图:图引用出处
多进程模型
先看一下 Egg 官方文档的进程模型
+——–+ +——-+
| Master |<——–>| Agent |
+——–+ +——-+
^ ^ ^
/ | \
/ | \
/ | \
v v v
+———-+ +———-+ +———-+
| Worker 1 | | Worker 2 | | Worker 3 |
+———-+ +———-+ +———-+

类型
进程数量
作用
稳定性
是否运行业务代码

Master
1
进程管理,进程间消息转发
非常高

Agent
1
后台运行工作(长连接客户端)

少量

Worker
一般为 cpu 核数
执行业务代码
一般

大致上就是利用 Master 作为主线程,启动 Agent 作为秘书进程协助 Worker 处理一些公共事务(日志之类),启动 Worker 进程执行真正的业务代码。
多进程的实现
流程相关代码
首先从 Master 入手,这里暂时认为 Master 是最顶级的进程(事实上还有一个 parent 进程,待会再说)。
/**
* start egg app
* @method Egg#startCluster
* @param {Object} options {@link Master}
* @param {Function} callback start success callback
*/
exports.startCluster = function(options, callback) {
new Master(options).ready(callback);
};
先从 Master 的构造函数看起
constructor(options) {
super();
// 初始化参数
this.options = parseOptions(options);
// worker 进程的管理类 详情见 Manager 及 Messenger 篇
this.workerManager = new Manager();
// messenger 类,详情见 Manager 及 Messenger 篇
this.messenger = new Messenger(this);
// 设置一个 ready 事件 详情见 get-ready npm 包
ready.mixin(this);
// 是否为生产环境
this.isProduction = isProduction();
this.agentWorkerIndex = 0;
// 是否关闭
this.closed = false;

接下来看的是 ready 的回调函数及注册的各类事件:
this.ready(() => {
// 将开始状态设置为 true
this.isStarted = true;
const stickyMsg = this.options.sticky ? ‘ with STICKY MODE!’ : ”;
this.logger.info(‘[master] %s started on %s (%sms)%s’,
frameworkPkg.name, this[APP_ADDRESS], Date.now() – startTime, stickyMsg);

// 发送 egg-ready 至各个进程并触发相关事件
const action = ‘egg-ready’;
this.messenger.send({action, to: ‘parent’, data: { port: this[REALPORT], address: this[APP_ADDRESS] } });
this.messenger.send({action, to: ‘app’, data: this.options});
this.messenger.send({action, to: ‘agent’, data: this.options});
// start check agent and worker status
this.workerManager.startCheck();
});
// 注册各类事件
this.on(‘agent-exit’, this.onAgentExit.bind(this));
this.on(‘agent-start’, this.onAgentStart.bind(this));

// 检查端口并 Fork 一个 Agent
detectPort((err, port) => {

this.forkAgentWorker();
}
});
}
综上,可以看到 Master 的构造函数主要是初始化和注册各类相应的事件,最后运行的是 forkAgentWorker 函数,该函数的关键代码可以看到:
const agentWorkerFile = path.join(__dirname, ‘agent_worker.js’);
// 通过 child_process 执行一个 Agent
const agentWorker = childprocess.fork(agentWorkerFile, args, opt);
继续到 agent_worker.js 上面看,agent_worker 实例化一个 agent 对象,agent_worker.js 有一句关键代码:
agent.ready(() => {
agent.removeListener(‘error’, startErrorHandler); // 清除错误监听的事件
process.send({action: ‘agent-start’, to: ‘master’}); // 向 master 发送一个 agent-start 的动作
});
可以看到,agent_worker.js 中的代码向 master 发出了一个信息,动作为 agent-start,再回到 Master 中,可以看到其注册了两个事件,分别为 once 的 forkAppWorkers 和 on 的 onAgentStart
this.on(‘agent-start’, this.onAgentStart.bind(this));
this.once(‘agent-start’, this.forkAppWorkers.bind(this));
先看 onAgentStart 函数,这个函数相对简单,就是一些信息的传递:
onAgentStart() {
this.agentWorker.status = ‘started’;

// Send egg-ready when agent is started after launched
if (this.isAllAppWorkerStarted) {
this.messenger.send({action: ‘egg-ready’, to: ‘agent’, data: this.options});
}

this.messenger.send({action: ‘egg-pids’, to: ‘app’, data: [ this.agentWorker.pid] });
// should send current worker pids when agent restart
if (this.isStarted) {
this.messenger.send({action: ‘egg-pids’, to: ‘agent’, data: this.workerManager.getListeningWorkerIds() });
}

this.messenger.send({action: ‘agent-start’, to: ‘app’});
this.logger.info(‘[master] agent_worker#%s:%s started (%sms)’,
this.agentWorker.id, this.agentWorker.pid, Date.now() – this.agentStartTime);
}
然后会执行 forkAppWorkers 函数,该函数主要是借助 cfork 包 fork 对应的工作进程,并注册一系列相关的监听事件,

cfork({
exec: this.getAppWorkerFile(),
args,
silent: false,
count: this.options.workers,
// don’t refork in local env
refork: this.isProduction,
});

// 触发 app-start 事件
cluster.on(‘listening’, (worker, address) => {
this.messenger.send({
action: ‘app-start’,
data: {workerPid: worker.process.pid, address},
to: ‘master’,
from: ‘app’,
});
});
可以看到 forkAppWorkers 函数在监听 Listening 事件时,会触发 master 上的 app-start 事件。
this.on(‘app-start’, this.onAppStart.bind(this));


// master ready 回调触发
if (this.options.sticky) {
this.startMasterSocketServer(err => {
if (err) return this.ready(err);
this.ready(true);
});
} else {
this.ready(true);
}

// ready 回调 发送 egg-ready 状态到各个进程
const action = ‘egg-ready’;
this.messenger.send({action, to: ‘parent’, data: { port: this[REALPORT], address: this[APP_ADDRESS] } });
this.messenger.send({action, to: ‘app’, data: this.options});
this.messenger.send({action, to: ‘agent’, data: this.options});

// start check agent and worker status
if (this.isProduction) {
this.workerManager.startCheck();
}
总结下:

Master.constructor: 先执行 Master 的构造函数,里面有个 detect 函数被执行
Detect: Detect => forkAgentWorker()
forkAgentWorker: 获取 Agent 进程,向 master 触发 agent-start 事件
执行 onAgentStart 函数,执行 forkAppWorker 函数(once)
onAgentStart => 发送各类信息, forkAppWorker => 向 master 触发 app-start 事件
App-start 事件 触发 onAppStart()方法
onAppStart => 设置 ready(true) => 执行 ready 的回调函数
Ready() = > 发送 egg-ready 到各个进程并触发相关事件, 执行 startCheck()函数

+———+ +———+ +———+
| Master | | Agent | | Worker |
+———+ +—-+—-+ +—-+—-+
| fork agent | |
+——————–>| |
| agent ready | |
|<——————–+ |
| | fork worker |
+—————————————–>|
| worker ready | |
|<—————————————–+
| Egg ready | |
+——————–>| |
| Egg ready | |
+—————————————–>|
进程守护
根据官方文档,进程守护主要是依赖于 graceful 和 egg-cluster 这两个库。
未捕获异常

关闭异常 Worker 进程所有的 TCP Server(将已有的连接快速断开,且不再接收新的连接),断开和 Master 的 IPC 通道,不再接受新的用户请求。
Master 立刻 fork 一个新的 Worker 进程,保证在线的『工人』总数不变。
异常 Worker 等待一段时间,处理完已经接受的请求后退出。

+———+ +———+
| Worker | | Master |
+———+ +—-+—-+
| uncaughtException |
+————+ |
| | | +———+
| <———-+ | | Worker |
| | +—-+—-+
| disconnect | fork a new worker |
+————————-> + ———————> |
| wait… | |
| exit | |
+————————-> | |
| | |
die | |
| |
| |
由执行的 app 文件可知,app 实际上是继承于 Application 类, 该类下面调用了 graceful()。
onServer(server) {
……
graceful({
server: [server],
error: (err, throwErrorCount) => {
……
},
});
……
}
继续看 graceful,可以看到它捕获了 process.on(‘uncaughtException’)事件,并在回调函数里面关闭 TCP 连接,关闭本身进程,断开与 master 的 IPC 通道。
process.on(‘uncaughtException’, function (err) {
……
// 对 http 连接设置 Connection: close 响应头
servers.forEach(function (server) {
if (server instanceof http.Server) {
server.on(‘request’, function (req, res) {
// Let http server set `Connection: close` header, and close the current request socket.
req.shouldKeepAlive = false;
res.shouldKeepAlive = false;
if (!res._header) {
res.setHeader(‘Connection’, ‘close’);
}
});
}
});

// 设置一个定时函数关闭子进程,并退出本身进程
// make sure we close down within `killTimeout` seconds
var killtimer = setTimeout(function () {
console.error(‘[%s] [graceful:worker:%s] kill timeout, exit now.’, Date(), process.pid);
if (process.env.NODE_ENV !== ‘test’) {
// kill children by SIGKILL before exit
killChildren(function() {
// 退出本身进程
process.exit(1);
});
}
}, killTimeout);

// But don’t keep the process open just for that!
// If there is no more io waitting, just let process exit normally.
if (typeof killtimer.unref === ‘function’) {
// only worked on node 0.10+
killtimer.unref();
}

var worker = options.worker || cluster.worker;

// cluster mode
if (worker) {
try {
// 关闭 TCP 连接
for (var i = 0; i < servers.length; i++) {
var server = servers[i];
server.close();
}
} catch (er1) {
……
}

try {
// 关闭 ICP 通道
worker.disconnect();
} catch (er2) {
……
}
}
});
ok, 关闭了 IPC 通道后,我们继续看 cfork 文件,即上面提到的 fork worker 的包,里面监听了子进程的 disconnect 事件,他会根据条件判断是否重新 fork 一个新的子进程
cluster.on(‘disconnect’, function (worker) {
……
// 存起该 pid
disconnects[worker.process.pid] = utility.logDate();
if (allow()) {
// fork 一个新的子进程
newWorker = forkWorker(worker._clusterSettings);
newWorker._clusterSettings = worker._clusterSettings;
} else {
……
}
});
一般来说,这个时候会继续等待一会然后就执行了上面说到的定时函数了,即退出进程。
OOM、系统异常关于这种系统异常,有时候在子进程中是不能捕获到的,我们只能在 master 中进行处理,也就是 cfork 包。
cluster.on(‘exit’, function (worker, code, signal) {
// 是程序异常的话,会通过上面提到的 uncatughException 重新 fork 一个子进程,所以这里就不需要了
var isExpected = !!disconnects[worker.process.pid];
if (isExpected) {
delete disconnects[worker.process.pid];
// worker disconnect first, exit expected
return;
}
// 是 master 杀死的子进程,无需 fork
if (worker.disableRefork) {
// worker is killed by master
return;
}

if (allow()) {
newWorker = forkWorker(worker._clusterSettings);
newWorker._clusterSettings = worker._clusterSettings;
} else {
……
}
cluster.emit(‘unexpectedExit’, worker, code, signal);
});
进程间通信(IPC)
上面一直提到各种进程间通信,细心的你可能已经发现 cluster 的 IPC 通道只存在于 Master 和 Worker/Agent 之间,Worker 与 Agent 进程互相间是没有的。那么 Worker 之间想通讯该怎么办呢?是的,通过 Master 来转发。
广播消息:agent => all workers
+——–+ +——-+
| Master |<———| Agent |
+——–+ +——-+
/ | \
/ | \
/ | \
/ | \
v v v
+———-+ +———-+ +———-+
| Worker 1 | | Worker 2 | | Worker 3 |
+———-+ +———-+ +———-+

指定接收方:one worker => another worker
+——–+ +——-+
| Master |———-| Agent |
+——–+ +——-+
^ |
send to / |
worker 2 / |
/ |
/ v
+———-+ +———-+ +———-+
| Worker 1 | | Worker 2 | | Worker 3 |
+———-+ +———-+ +———-+
在 master 中,可以看到当 agent 和 app 被 fork 时,会监听他们的信息,同时将信息转化成一个对象:
agentWorker.on(‘message’, msg => {
if (typeof msg === ‘string’) msg = {action: msg, data: msg};
msg.from = ‘agent’;
this.messenger.send(msg);
});

worker.on(‘message’, msg => {
if (typeof msg === ‘string’) msg = {action: msg, data: msg};
msg.from = ‘app’;
this.messenger.send(msg);
});
可以看到最后调用的是 messenger.send, 而 messengeer.send 就是根据 from 和 to 来决定将信息发送到哪里
send(data) {
if (!data.from) {
data.from = ‘master’;
}
……

// app -> master
// agent -> master
if (data.to === ‘master’) {
debug(‘%s -> master, data: %j’, data.from, data);
// app/agent to master
this.sendToMaster(data);
return;
}

// master -> parent
// app -> parent
// agent -> parent
if (data.to === ‘parent’) {
debug(‘%s -> parent, data: %j’, data.from, data);
this.sendToParent(data);
return;
}

// parent -> master -> app
// agent -> master -> app
if (data.to === ‘app’) {
debug(‘%s -> %s, data: %j’, data.from, data.to, data);
this.sendToAppWorker(data);
return;
}

// parent -> master -> agent
// app -> master -> agent,可能不指定 to
if (data.to === ‘agent’) {
debug(‘%s -> %s, data: %j’, data.from, data.to, data);
this.sendToAgentWorker(data);
return;
}
}
master 则是直接根据 action 信息 emit 对应的注册事件
sendToMaster(data) {
this.master.emit(data.action, data.data);
}
而 agent 和 worker 则是通过一个 sendmessage 包,实际上就是调用下面类似的方法
// 将信息传给子进程
agent.send(data)
worker.send(data)
最后,在 agent 和 app 都继承的基础类 EggApplication 上,调用了 Messenger 类,该类内部的构造函数如下:
constructor() {
super();
……
this._onMessage = this._onMessage.bind(this);
process.on(‘message’, this._onMessage);
}

_onMessage(message) {
if (message && is.string(message.action)) {
// 和 master 一样根据 action 信息 emit 对应的注册事件
this.emit(message.action, message.data);
}
}
总结一下:思路就是利用事件机制和 IPC 通道来达到各个进程之间的通信。
其他
学习过程中有遇到一个 timeout.unref()的函数,关于该函数推荐大家参考这个问题的 6 楼回答
总结
从前端思维转到后端思维其实还是很吃力的,加上 Egg 的进程管理实现确实非常厉害,所以花了很多时间在各种 api 和思路思考上。
参考与引用
多进程模型和进程间通讯 Egg 源码解析之 egg-cluster

退出移动版