关于node.js:nodejs篇进程与集群cluster

41次阅读

共计 8697 个字符,预计需要花费 22 分钟才能阅读完成。

咱们启动一个服务、运行一个实例,就是开一个服务过程,Node.js 里通过 node app.js 开启一个服务过程,多过程就是过程的复制(fork),fork 进去的每个过程都领有本人的独立空间地址、数据栈,一个过程无法访问另外一个过程里定义的变量、数据结构,只有建设了 IPC 通信,过程之间才可数据共享。

child_process

node.js中能够通过上面四种形式创立子过程:

  • child_process.spawn(command, args)
  • child_process.exec(command, options)
  • child_process.execFile(file, args[, callback])
  • child_process.fork(modulePath, args)

spawn

const {spawn} = require("child_process");
// 创立 文件
spawn("touch",["index.js"]);

spawn()会返回 child-process 子过程实例:

const {spawn} = require("child_process");
// cwd 指定子过程的工作目录,默认当前目录
const child = spawn("ls",["-l"],{cwd:__dirname});
// 输入过程信息
child.stdout.pipe(process.stdout);
console.log(process.pid,child.pid);

子过程同样基于事件机制(EventEmitter API),提供了一些事件:

  • exit:子过程退出时触发,能够得悉过程退出状态(code 和 signal)
  • disconnect:父过程调用 child.disconnect()时触发
  • error:子过程创立失败,或被 kill 时触发
  • close:子过程的 stdio 流(规范输入输出流)敞开时触发
  • message:子过程通过 process.send()发送音讯时触发,父子过程音讯通信

close 与 exit 的区别次要体现在多过程共享同一 stdio 流的场景,某个过程退出了并不意味着 stdio 流被敞开了

子过程具备可读流的个性,利用可读流实现find . -type f | wc -l,递归统计当前目录文件数量:

const {spawn} = require('child_process');

const find = spawn('find', ['.', '-type', 'f']);
const wc = spawn('wc', ['-l']);

find.stdout.pipe(wc.stdin);

wc.stdout.on('data', (data) => {console.log(`Number of files ${data}`);
});

exec

spawn()exec() 办法的区别在于,exec()不是基于 stream 的,exec()会将传入命令的执行后果暂存到 buffer 中,再整个传递给回调函数。

spawn()默认不会创立 shell 去执行命令(性能上会稍好),而 exec() 办法执行是会先创立 shell,所以能够在 exec() 办法中传入任意 shell 脚本。

const {exec} = require("child_process");

exec("node -v",(error,stdout,stderr)=>{if (error) console.log(error);
    console.log(stdout)
})

exec()办法因为能够传入任意 shell 脚本所以存在平安危险。

spawn()办法默认不会创立 shell 去执行传入的命令(所以性能上略微好一点),不过能够通过参数实现:

const {spawn} = require('child_process');
const child = spawn('node -v', {shell: true});
child.stdout.pipe(process.stdout);

这种做法的益处是,既能反对 shell 语法,也能通过 stream IO 进行规范输入输出。

execFile

const {execFile} = require("child_process");

execFile("node",["-v"],(error,stdout,stderr)=>{console.log({ error, stdout, stderr})
    console.log(stdout)
})

通过可执行文件门路执行:

const {execFile} = require("child_process");

execFile("/Users/.nvm/versions/node/v12.1.0/bin/node",["-v"],(error,stdout,stderr)=>{console.log({ error, stdout, stderr})
    console.log(stdout)
})

fork

fork()办法能够用来创立 Node 过程,并且父子过程能够相互通信

//master.js
const {fork} = require("child_process");
const worker = fork("worker.js");

worker.on("message",(msg)=>{console.log(`from worder:${msg}`)
});
worker.send("this is master");

// worker.js
process.on("message",(msg)=>{console.log("worker",msg)
});
process.send("this is worker");

利用 fork() 能够用来解决计算量大,耗时长的工作:

const longComputation = () => {
  let sum = 0;
  for (let i = 0; i < 1e10; i++) {sum += i;};
  return sum;
};

longComputation 办法拆分到子过程中,这样主过程的事件循环不会被耗时计算阻塞:

const http = require('http');
const {fork} = require('child_process');

const server = http.createServer();

server.on('request', (req, res) => {if (req.url === '/compute') {
    // 将计算量大的工作,拆分到子过程中解决
    const compute = fork('compute.js');
    compute.send('start');
    compute.on('message', sum => {
        // 收到子过程工作后,返回
      res.end(`Sum is ${sum}`);
    });
  } else {res.end('Ok')
  }
});

server.listen(3000);

过程间通信 IPC

每个过程各自有不同的用户地址空间,任何一个过程的全局变量在另一个过程中都看不到,所以过程之间要替换数据必须通过内核,在内核中开拓一块缓冲区,过程 1 把数据从用户空间拷到内核缓冲区,过程 2 再从内核缓冲区把数据读走,内核提供的这种机制称为 过程间通信(IPC,InterProcess Communication)

过程之间能够借助内置的 IPC 机制通信

父过程:

  • 接管事件process.on('message')
  • 发送信息给子过程master.send()

子过程:

  • 接管事件process.on('message')
  • 发送信息给父过程process.send()

fork 多过程

nodejs 中的多过程是 多过程 + 单线程 的模式

// master.js. 
process.title = 'node-master'
const net = require("net");
const {fork} = require("child_process");

const handle = net._createServerHandle("127.0.0.1",3000);

for(let i=0;i<4;i++){fork("./worker.js").send({},handle);
}

// worker.js
process.title = 'worker-master';

const net = require("net");

process.on("message",(msg,handle)=>start(handle));

const buf = "hello nodejs";
const res= ["HTTP/1.1 200 ok","content-length:"+buf.length].join("\r\n")+"\r\n\r\n"+buf;

function start(server){server.listen();
    let num=0;
    server.onconnection = function(err,handle){
        num++;
        console.log(`worker ${process.pid} num ${num}`);
        let socket = new net.Socket({handle});
        socket.readable = socket.writable = true
        socket.end(res);
    }
}

运行 node master.js,这里能够应用测试工具 Siege:

siege -c 20 -r 10 http://localhost:3000

-c 并发量,并发数为 20 人 -r 是反复次数,反复 10 次

这种创立过程的特点是:

  • 在一个服务上同时启动多个过程
  • 每个过程运行同样的代码(start 办法)
  • 多个过程能够同时监听一个端口(3000)

不过每次申请过去交给哪个 worker 解决,master并不分明,咱们更心愿 master 可能掌控全局,将申请指定给worker,咱们做上面的革新:

//master.js
process.title = 'node-master'
const net =require("net");
const {fork} = require("child_process");

// 定义 workers 变量,保留子过程 worker
let workers = [];
for(let i=0;i<4;i++){workers.push(fork("./worker.js"));
}
const handle = net._createServerHandle("0.0.0.0", 3000)
handle.listen();
// master 管制申请
handle.onconnection = function(err,handle){let worker = workers.pop();
    // 将申请传递给子过程
    worker.send({},handle);
    workers.unshift(worker);
}

// worker.js
process.title = 'worker-master';
const net = require("net")
process.on("message", (msg, handle) => start(handle))

const buf = "hello nodejs"
const res = ["HTTP/1.1 200 ok", "content-length:" + buf.length].join("\r\n") + "\r\n\r\n" + buf

function start(handle) {console.log(`get a connection on worker,pid = %d`, process.pid)
  let socket = new net.Socket({handle})
  socket.readable = socket.writable = true
  socket.end(res)
}

Cluster 多过程

Node.js 官网提供的 Cluster 模块不仅充分利用机器 CPU 内核开箱即用的解决方案,还有助于 Node 过程减少可用性的能力,Cluster模块是对多过程服务能力的封装。

// master.js
const cluster = require("cluster");
const numCPUS = require("os").cpus().length;

if(cluster.isMaster){console.log(`master start...`)
    for(let i=0;i<numCPUS;i++){cluster.fork();
    };

    cluster.on("listening",(worker,address)=>{console.log(`master listing worker pid ${worker.process.pid} address port:${address.port}`)
    })

}else if(cluster.isWorker){require("./wroker.js")
}
//wroker.js
const http = require("http");
http.createServer((req,res)=>res.end(`hello`)).listen(3000)

过程重启和守护

过程重启

为了减少服务器的可用性,咱们心愿实例在呈现解体或者异样退出时,可能主动重启。

//master.js
const cluster = require("cluster")
const numCPUS = require("os").cpus().length

if (cluster.isMaster) {console.log("master start..")
  for (let i = 0; i < numCPUS; i++) {cluster.fork()
    }
  cluster.on("listening", (worker, address) => {console.log("listening worker pid" + worker.process.pid)
  })
  cluster.on("exit", (worker, code, signal) => {
      // 子过程出现异常或者奔败退出
    if (code !== 0 && !worker.exitedAfterDisconnect) {console.log(` 工作过程 ${worker.id} 解体了,正在开始一个新的工作过程 `)
      // 从新开启子过程
      cluster.fork()}
  })
} else if (cluster.isWorker) {require("./server")
}
const http = require("http")
const server = http.createServer((req, res) => {
    // 随机触发谬误
  if (Math.random() > 0.5) {throw new Error(`worker error pid=${process.pid}`)
  }
  res.end(`worker pid:${process.pid} num:${num}`)
}).listen(3000)

如果申请抛出异样而完结子过程,主过程可能监听到完结事件,重启开启子过程。

下面的重启只是简略解决,真正我的项目中要思考到的就很多了,这里能够参考 egg 的多过程模型和过程间通信。

上面是来自文章 Node.js 进阶之过程与线程更全面的例子:

// master.js
const {fork} = require("child_process");
const numCPUS = require("os").cpus().length;

const server = require("net").createServer();
server.listen(3000);
process.title="node-master";

const workers = {};
const createWorker = ()=>{const worker = fork("worker.js");
    worker.on("message",message=>{if(message.act==="suicide"){createWorker();
        }
    })

    worker.on("exit",(code,signal)=>{console.log('worker process exited,code %s signal:%s',code,signal);
        delete workers[worker.pid];
    });

    worker.send("server",server);
    workers[worker.pid] = worker;
    console.log("worker process created,pid %s ppid:%s", worker.pid, process.ppid)
}

for (let i = 0; i < numCPUS; i++) {createWorker()
}

process.once("SIGINT",close.bind(this,"SIGINT")); // kill(2) Ctrl+C
process.once("SIGQUIT", close.bind(this, "SIGQUIT")) // kill(3) Ctrl+l
process.once("SIGTERM", close.bind(this, "SIGTERM")) // kill(15) default
process.once("exit", close.bind(this))

function close(code){console.log('process exit',code);
    if(code!=0){for(let pid in workers){console.log('master process exit,kill worker pid:',pid);
            workers[pid].kill("SIGINT");
        }
    };
    process.exit(0);
}
//worker.js
const http=require("http");
const server = http.createServer((req,res)=>{res.writeHead(200,{"Content-Type":"text/plain"});
    res.end(`worker pid:${process.pid},ppid:${process.ppid}`)
    throw new Error("worker process exception!");
});

let worker;
process.title = "node-worker";
process.on("message",(message,handle)=>{if(message==="server"){
        worker = handle;
        worker.on("connection",socket=>{server.emit("connection",socket)
        })
    }
})
process.on("uncaughtException",(error)=>{console.log('some error')
    process.send({act:"suicide"});
    worker.close(()=>{console.log(process.pid+"close")
        process.exit(1);
    })
})

这个例子思考更加周到,通过 uncaughtException 捕捉子过程异样后,发送信息给主过程重启,并在链接敞开后退出。

过程守护

pm2 能够使服务在后盾运行不受终端的影响,这里次要通过两步解决:

  • options.detached:为 true 时运行子过程在父过程退出后持续运行
  • unref() 办法能够断绝跟父过程的关系,使父过程退出后子过程不会跟着退出
const {spawn} = require("child_process")

function startDaemon() {const daemon = spawn("node", ["daemon.js"], {
    // 当前工作目录
    cwd: __dirname,
    // 作为独立过程存在
    detached: true,
    // 漠视输入输出流
    stdio: "ignore",
  })
  console.log(` 守护过程 ppid:%s pid:%s`, process.pid, daemon.pid)
  // 断绝父子过程关系
  daemon.unref()}

startDaemon()
// daemon.js
const fs = require("fs")
const {Console} = require("console");
// 输入日志
const logger = new Console(fs.createWriteStream("./stdout.log"),fs.createWriteStream("./stderr.log"));
// 放弃过程始终在后盾运行
setInterval(()=>{logger.log("daemon pid:",process.pid,"ppid:",process.ppid)
},1000*10);

// 生成敞开文件
fs.writeFileSync("./stop.js", `process.kill(${process.pid}, "SIGTERM")`)

参考链接

  • node.js 应用 cluster 实现多过程
  • Nodejs 进阶:如何玩转子过程(child_process)
  • Node.js 进阶之过程与线程
  • Nodejs 过程间通信
  • Node.js 集群(cluster):扩大你的 Node.js 利用
  • eggjs- 多过程模型和过程间通信

正文完
 0